diff --git a/br/pkg/lightning/restore/check_info_test.go b/br/pkg/lightning/restore/check_info_test.go index 36903ab93b22c..9155dcdc73bda 100644 --- a/br/pkg/lightning/restore/check_info_test.go +++ b/br/pkg/lightning/restore/check_info_test.go @@ -412,6 +412,7 @@ func TestCheckCSVHeader(t *testing.T) { dbMetas, preInfoGetter, nil, + nil, ) preInfoGetter.dbInfosCache = rc.dbInfos err = rc.checkCSVHeader(ctx) @@ -465,6 +466,7 @@ func TestCheckTableEmpty(t *testing.T) { dbMetas, preInfoGetter, nil, + nil, ) rc := &Controller{ @@ -622,6 +624,7 @@ func TestLocalResource(t *testing.T) { nil, preInfoGetter, nil, + nil, ) rc := &Controller{ cfg: cfg, diff --git a/br/pkg/lightning/restore/precheck.go b/br/pkg/lightning/restore/precheck.go index 4e987c757c8e8..dd3bb11180d44 100644 --- a/br/pkg/lightning/restore/precheck.go +++ b/br/pkg/lightning/restore/precheck.go @@ -52,13 +52,21 @@ func WithPrecheckKey(ctx context.Context, key precheckContextKey, val any) conte } type PrecheckItemBuilder struct { - cfg *config.Config - dbMetas []*mydump.MDDatabaseMeta - preInfoGetter PreRestoreInfoGetter - checkpointsDB checkpoints.DB + cfg *config.Config + dbMetas []*mydump.MDDatabaseMeta + preInfoGetter PreRestoreInfoGetter + checkpointsDB checkpoints.DB + pdLeaderAddrGetter func() string } -func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, pdCli pd.Client, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) { +// NewPrecheckItemBuilderFromConfig creates a new PrecheckItemBuilder from config +// pdCli **must not** be nil for local backend +func NewPrecheckItemBuilderFromConfig( + ctx context.Context, + cfg *config.Config, + pdCli pd.Client, + opts ...ropts.PrecheckItemBuilderOption, +) (*PrecheckItemBuilder, error) { var gerr error builderCfg := new(ropts.PrecheckItemBuilderConfig) for _, o := range opts { @@ -98,7 +106,7 @@ func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, p if err != nil { return nil, errors.Trace(err) } - return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb), gerr + return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb, pdCli), gerr } func NewPrecheckItemBuilder( @@ -106,12 +114,21 @@ func NewPrecheckItemBuilder( dbMetas []*mydump.MDDatabaseMeta, preInfoGetter PreRestoreInfoGetter, checkpointsDB checkpoints.DB, + pdCli pd.Client, ) *PrecheckItemBuilder { + leaderAddrGetter := func() string { + return cfg.TiDB.PdAddr + } + // in tests we may not have a pdCli + if pdCli != nil { + leaderAddrGetter = pdCli.GetLeaderAddr + } return &PrecheckItemBuilder{ - cfg: cfg, - dbMetas: dbMetas, - preInfoGetter: preInfoGetter, - checkpointsDB: checkpointsDB, + cfg: cfg, + dbMetas: dbMetas, + preInfoGetter: preInfoGetter, + checkpointsDB: checkpointsDB, + pdLeaderAddrGetter: leaderAddrGetter, } } @@ -142,7 +159,7 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID CheckItemID) (PrecheckIt case CheckLocalTempKVDir: return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter), nil case CheckTargetUsingCDCPITR: - return NewCDCPITRCheckItem(b.cfg), nil + return NewCDCPITRCheckItem(b.cfg, b.pdLeaderAddrGetter), nil default: return nil, errors.Errorf("unsupported check item: %v", checkID) } diff --git a/br/pkg/lightning/restore/precheck_impl.go b/br/pkg/lightning/restore/precheck_impl.go index ed31504bf87e9..77d833e8ae54c 100644 --- a/br/pkg/lightning/restore/precheck_impl.go +++ b/br/pkg/lightning/restore/precheck_impl.go @@ -682,17 +682,19 @@ func (ci *checkpointCheckItem) checkpointIsValid(ctx context.Context, tableInfo // CDCPITRCheckItem check downstream has enabled CDC or PiTR. It's exposed to let // caller override the Instruction message. type CDCPITRCheckItem struct { - cfg *config.Config - Instruction string + cfg *config.Config + Instruction string + leaderAddrGetter func() string // used in test etcdCli *clientv3.Client } // NewCDCPITRCheckItem creates a checker to check downstream has enabled CDC or PiTR. -func NewCDCPITRCheckItem(cfg *config.Config) PrecheckItem { +func NewCDCPITRCheckItem(cfg *config.Config, leaderAddrGetter func() string) PrecheckItem { return &CDCPITRCheckItem{ - cfg: cfg, - Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.", + cfg: cfg, + Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.", + leaderAddrGetter: leaderAddrGetter, } } @@ -701,7 +703,11 @@ func (ci *CDCPITRCheckItem) GetCheckItemID() CheckItemID { return CheckTargetUsingCDCPITR } -func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client, error) { +func dialEtcdWithCfg( + ctx context.Context, + cfg *config.Config, + leaderAddr string, +) (*clientv3.Client, error) { cfg2, err := cfg.ToTLS() if err != nil { return nil, err @@ -710,7 +716,7 @@ func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client, return clientv3.New(clientv3.Config{ TLS: tlsConfig, - Endpoints: []string{cfg.TiDB.PdAddr}, + Endpoints: []string{leaderAddr}, AutoSyncInterval: 30 * time.Second, DialTimeout: 5 * time.Second, DialOptions: []grpc.DialOption{ @@ -741,7 +747,7 @@ func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*CheckResult, error) { if ci.etcdCli == nil { var err error - ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg) + ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg, ci.leaderAddrGetter()) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/lightning/restore/precheck_impl_test.go b/br/pkg/lightning/restore/precheck_impl_test.go index 6749b905ff8bc..2a1ecf63c1acf 100644 --- a/br/pkg/lightning/restore/precheck_impl_test.go +++ b/br/pkg/lightning/restore/precheck_impl_test.go @@ -597,7 +597,7 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() { Backend: config.BackendLocal, }, } - ci := NewCDCPITRCheckItem(cfg) + ci := NewCDCPITRCheckItem(cfg, nil) checker := ci.(*CDCPITRCheckItem) checker.etcdCli = testEtcdCluster.RandClient() result, err := ci.Check(ctx) diff --git a/br/pkg/lightning/restore/precheck_test.go b/br/pkg/lightning/restore/precheck_test.go index 8bb32e9b5f67c..a5ceab62dd0a7 100644 --- a/br/pkg/lightning/restore/precheck_test.go +++ b/br/pkg/lightning/restore/precheck_test.go @@ -31,7 +31,7 @@ func TestPrecheckBuilderBasic(t *testing.T) { preInfoGetter, err := NewPreRestoreInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil) require.NoError(t, err) - theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil) + theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil, nil) for _, checkItemID := range []CheckItemID{ CheckLargeDataFile, CheckSourcePermission, diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 973f5aed4dab3..4501a184106b0 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -413,7 +413,7 @@ func NewRestoreControllerWithPauser( } preCheckBuilder := NewPrecheckItemBuilder( - cfg, p.DBMetas, preInfoGetter, cpdb, + cfg, p.DBMetas, preInfoGetter, cpdb, pdCli, ) rc := &Controller{ @@ -462,6 +462,8 @@ func (rc *Controller) Close() { } func (rc *Controller) Run(ctx context.Context) error { + failpoint.Inject("beforeRun", func() {}) + opts := []func(context.Context) error{ rc.setGlobalVariables, rc.restoreSchema, @@ -1351,7 +1353,7 @@ const ( func (rc *Controller) keepPauseGCForDupeRes(ctx context.Context) (<-chan struct{}, error) { tlsOpt := rc.tls.ToPDSecurityOption() - pdCli, err := pd.NewClientWithContext(ctx, []string{rc.cfg.TiDB.PdAddr}, tlsOpt) + pdCli, err := pd.NewClientWithContext(ctx, []string{rc.pdCli.GetLeaderAddr()}, tlsOpt) if err != nil { return nil, errors.Trace(err) } @@ -1512,8 +1514,13 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) { } // Disable GC because TiDB enables GC already. + + currentLeaderAddr := rc.pdCli.GetLeaderAddr() + // remove URL scheme + currentLeaderAddr = strings.TrimPrefix(currentLeaderAddr, "http://") + currentLeaderAddr = strings.TrimPrefix(currentLeaderAddr, "https://") kvStore, err = driver.TiKVDriver{}.OpenWithOptions( - fmt.Sprintf("tikv://%s?disableGC=true", rc.cfg.TiDB.PdAddr), + fmt.Sprintf("tikv://%s?disableGC=true", currentLeaderAddr), driver.WithSecurity(rc.tls.ToTiKVSecurityConfig()), ) if err != nil { @@ -2114,7 +2121,7 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error { rc.status.TotalFileSize.Store(estimatedSizeResult.SizeWithoutIndex) } if isLocalBackend(rc.cfg) { - pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr, + pdController, err := pdutil.NewPdController(ctx, rc.pdCli.GetLeaderAddr(), rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption()) if err != nil { return common.NormalizeOrWrapErr(common.ErrCreatePDClient, err) diff --git a/br/pkg/lightning/restore/restore_test.go b/br/pkg/lightning/restore/restore_test.go index 88eeb137c5a1a..8dd05a8e09f7c 100644 --- a/br/pkg/lightning/restore/restore_test.go +++ b/br/pkg/lightning/restore/restore_test.go @@ -227,7 +227,7 @@ func TestPreCheckFailed(t *testing.T) { dbMetas: make([]*mydump.MDDatabaseMeta, 0), } cpdb := panicCheckpointDB{} - theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb) + theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb, nil) ctl := &Controller{ cfg: cfg, saveCpCh: make(chan saveCp), diff --git a/br/pkg/lightning/restore/table_restore_test.go b/br/pkg/lightning/restore/table_restore_test.go index 5f6d3027a10c0..f622d32a80b1d 100644 --- a/br/pkg/lightning/restore/table_restore_test.go +++ b/br/pkg/lightning/restore/table_restore_test.go @@ -1130,7 +1130,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource() { targetInfoGetter: targetInfoGetter, srcStorage: mockStore, } - theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil) + theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil, nil) rc := &Controller{ cfg: cfg, tls: tls, @@ -1294,7 +1294,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() { targetInfoGetter: targetInfoGetter, dbMetas: dbMetas, } - theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB()) + theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB(), nil) rc := &Controller{ cfg: cfg, tls: tls, @@ -1390,7 +1390,7 @@ func (s *tableRestoreSuite) TestCheckHasLargeCSV() { for _, ca := range cases { template := NewSimpleTemplate() cfg := &config.Config{Mydumper: config.MydumperRuntime{StrictFormat: ca.strictFormat}} - theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil) + theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil, nil) rc := &Controller{ cfg: cfg, checkTemplate: template, @@ -1445,7 +1445,7 @@ func (s *tableRestoreSuite) TestEstimate() { targetInfoGetter: mockTarget, } preInfoGetter.Init() - theCheckBuilder := NewPrecheckItemBuilder(s.cfg, dbMetas, preInfoGetter, nil) + theCheckBuilder := NewPrecheckItemBuilder(s.cfg, dbMetas, preInfoGetter, nil, nil) rc := &Controller{ cfg: s.cfg, checkTemplate: template, diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go index 3125d5ed18f59..6292c02fb1709 100644 --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -327,6 +327,16 @@ func parseVersion(versionBytes []byte) *semver.Version { return version } +// TODO: always read latest PD nodes from PD client +func (p *PdController) getAllPDAddrs() []string { + ret := make([]string, 0, len(p.addrs)+1) + if p.pdClient != nil { + ret = append(ret, p.pdClient.GetLeaderAddr()) + } + ret = append(ret, p.addrs...) + return ret +} + func (p *PdController) isPauseConfigEnabled() bool { return p.version.Compare(pauseConfigVersion) >= 0 } @@ -354,7 +364,7 @@ func (p *PdController) GetClusterVersion(ctx context.Context) (string, error) { func (p *PdController) getClusterVersionWith(ctx context.Context, get pdHTTPRequest) (string, error) { var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { v, e := get(ctx, addr, clusterVersionPrefix, p.cli, http.MethodGet, nil) if e != nil { err = e @@ -381,7 +391,7 @@ func (p *PdController) getRegionCountWith( end = url.QueryEscape(string(codec.EncodeBytes(nil, endKey))) } var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { query := fmt.Sprintf( "%s?start_key=%s&end_key=%s", regionCountPrefix, start, end) @@ -408,7 +418,7 @@ func (p *PdController) GetStoreInfo(ctx context.Context, storeID uint64) (*pdtyp func (p *PdController) getStoreInfoWith( ctx context.Context, get pdHTTPRequest, storeID uint64) (*pdtypes.StoreInfo, error) { var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { query := fmt.Sprintf( "%s/%d", storePrefix, storeID) @@ -437,7 +447,7 @@ func (p *PdController) doPauseSchedulers(ctx context.Context, schedulers []strin removedSchedulers := make([]string, 0, len(schedulers)) for _, scheduler := range schedulers { prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler) - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { _, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body)) if err == nil { removedSchedulers = append(removedSchedulers, scheduler) @@ -520,7 +530,7 @@ func (p *PdController) resumeSchedulerWith(ctx context.Context, schedulers []str } for _, scheduler := range schedulers { prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler) - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { _, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body)) if err == nil { break @@ -544,7 +554,7 @@ func (p *PdController) ListSchedulers(ctx context.Context) ([]string, error) { func (p *PdController) listSchedulersWith(ctx context.Context, get pdHTTPRequest) ([]string, error) { var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { v, e := get(ctx, addr, schedulerPrefix, p.cli, http.MethodGet, nil) if e != nil { err = e @@ -566,7 +576,7 @@ func (p *PdController) GetPDScheduleConfig( ctx context.Context, ) (map[string]interface{}, error) { var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { v, e := pdRequest( ctx, addr, scheduleConfigPrefix, p.cli, http.MethodGet, nil) if e != nil { @@ -604,7 +614,7 @@ func (p *PdController) doUpdatePDScheduleConfig( newCfg[sc] = v } - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { reqData, err := json.Marshal(newCfg) if err != nil { return errors.Trace(err) @@ -811,7 +821,7 @@ func (p *PdController) doRemoveSchedulersWith( // GetMinResolvedTS get min-resolved-ts from pd func (p *PdController) GetMinResolvedTS(ctx context.Context) (uint64, error) { var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { v, e := pdRequest(ctx, addr, minResolvedTSPrefix, p.cli, http.MethodGet, nil) if e != nil { log.Warn("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e)) @@ -845,7 +855,7 @@ func (p *PdController) RecoverBaseAllocID(ctx context.Context, id uint64) error ID: fmt.Sprintf("%d", id), }) var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { _, e := pdRequest(ctx, addr, baseAllocIDPrefix, p.cli, http.MethodPost, bytes.NewBuffer(reqData)) if e != nil { log.Warn("failed to recover base alloc id", zap.String("addr", addr), zap.Error(e)) @@ -869,7 +879,7 @@ func (p *PdController) ResetTS(ctx context.Context, ts uint64) error { ForceUseLarger: true, }) var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { code, _, e := pdRequestWithCode(ctx, addr, resetTSPrefix, p.cli, http.MethodPost, bytes.NewBuffer(reqData)) if e != nil { // for pd version <= 6.2, if the given ts < current ts of pd, pd returns StatusForbidden. @@ -899,7 +909,7 @@ func (p *PdController) UnmarkRecovering(ctx context.Context) error { func (p *PdController) operateRecoveringMark(ctx context.Context, method string) error { var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { _, e := pdRequest(ctx, addr, recoveringMarkPrefix, p.cli, method, nil) if e != nil { log.Warn("failed to operate recovering mark", zap.String("method", method), @@ -944,7 +954,8 @@ func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule L panic(err) } var lastErr error - for i, addr := range p.addrs { + addrs := p.getAllPDAddrs() + for i, addr := range addrs { _, lastErr = pdRequest(ctx, addr, regionLabelPrefix, p.cli, http.MethodPost, bytes.NewBuffer(reqData)) if lastErr == nil { @@ -954,7 +965,7 @@ func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule L return errors.Trace(lastErr) } - if i < len(p.addrs) { + if i < len(addrs) { log.Warn("failed to create or update region label rule, will try next pd address", zap.Error(lastErr), zap.String("pdAddr", addr)) } @@ -965,7 +976,8 @@ func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule L // DeleteRegionLabelRule deletes a region label rule. func (p *PdController) DeleteRegionLabelRule(ctx context.Context, ruleID string) error { var lastErr error - for i, addr := range p.addrs { + addrs := p.getAllPDAddrs() + for i, addr := range addrs { _, lastErr = pdRequest(ctx, addr, fmt.Sprintf("%s/%s", regionLabelPrefix, ruleID), p.cli, http.MethodDelete, nil) if lastErr == nil { @@ -975,7 +987,7 @@ func (p *PdController) DeleteRegionLabelRule(ctx context.Context, ruleID string) return errors.Trace(lastErr) } - if i < len(p.addrs) { + if i < len(addrs) { log.Warn("failed to delete region label rule, will try next pd address", zap.Error(lastErr), zap.String("pdAddr", addr)) } diff --git a/br/tests/_utils/run_services b/br/tests/_utils/run_services index 7e1917150b263..afd0fded1778d 100644 --- a/br/tests/_utils/run_services +++ b/br/tests/_utils/run_services @@ -18,6 +18,7 @@ set -eu export PD_PEER_ADDR="127.0.0.1:2380" export PD_ADDR="127.0.0.1:2379" +export PD_PID="${TEST_DIR:?}/pd_pid.txt" export TIDB_IP="127.0.0.1" export TIDB_PORT="4000" export TIDB_ADDR="127.0.0.1:4000" @@ -73,13 +74,15 @@ start_services() { start_pd() { echo "Starting PD..." - mkdir -p "$TEST_DIR/pd" + mkdir -p "$TEST_DIR/pd" -m 700 bin/pd-server \ --client-urls "https://$PD_ADDR" \ --peer-urls "https://$PD_PEER_ADDR" \ --log-file "$TEST_DIR/pd.log" \ --data-dir "$TEST_DIR/pd" \ --config $PD_CONFIG & + pid=$! + echo -e "$pid" > "${PD_PID}" # wait until PD is online... i=0 while ! run_curl "https://$PD_ADDR/pd/api/v1/version"; do diff --git a/br/tests/lightning_local_backend/run.sh b/br/tests/lightning_local_backend/run.sh index d59563637c4f4..60fca277d8cf6 100755 --- a/br/tests/lightning_local_backend/run.sh +++ b/br/tests/lightning_local_backend/run.sh @@ -36,7 +36,7 @@ grep -Fq 'table(s) [`cpeng`.`a`, `cpeng`.`b`] are not empty' $TEST_DIR/lightning # First, verify that inject with not leader error is fine. -export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader");github.com/pingcap/tidb/br/pkg/lightning/backend/local/failToSplit=2*return("")' +export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader");github.com/pingcap/tidb/br/pkg/lightning/backend/local/failToSplit=5*return("")' rm -f "$TEST_DIR/lightning-local.log" run_sql 'DROP DATABASE IF EXISTS cpeng;' run_lightning --backend local --enable-checkpoint=1 --log-file "$TEST_DIR/lightning-local.log" --config "tests/$TEST_NAME/config.toml" -L debug diff --git a/br/tests/lightning_pd_leader_switch/config.toml b/br/tests/lightning_pd_leader_switch/config.toml new file mode 100644 index 0000000000000..73c54882430c7 --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/config.toml @@ -0,0 +1,14 @@ +[lightning] +table-concurrency = 1 +index-concurrency = 1 + +[checkpoint] +enable = true +driver = "file" +schema = "tidb_lightning_checkpoint_local_backend_test" + +[tikv-importer] +send-kv-pairs = 2 + +[mydumper] +batch-size = 50 # force splitting the data into 4 batches diff --git a/br/tests/lightning_pd_leader_switch/data/cpeng-schema-create.sql b/br/tests/lightning_pd_leader_switch/data/cpeng-schema-create.sql new file mode 100644 index 0000000000000..1e23466eeee52 --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/data/cpeng-schema-create.sql @@ -0,0 +1 @@ +create database cpeng; diff --git a/br/tests/lightning_pd_leader_switch/data/cpeng.a-schema.sql b/br/tests/lightning_pd_leader_switch/data/cpeng.a-schema.sql new file mode 100644 index 0000000000000..6c1f5ee154c58 --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/data/cpeng.a-schema.sql @@ -0,0 +1 @@ +create table a (c VARCHAR(20) PRIMARY KEY); diff --git a/br/tests/lightning_pd_leader_switch/data/cpeng.a.1.sql b/br/tests/lightning_pd_leader_switch/data/cpeng.a.1.sql new file mode 100644 index 0000000000000..a75039e1304e3 --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/data/cpeng.a.1.sql @@ -0,0 +1 @@ +insert into a values ('0000001'); diff --git a/br/tests/lightning_pd_leader_switch/data/cpeng.a.2.sql b/br/tests/lightning_pd_leader_switch/data/cpeng.a.2.sql new file mode 100644 index 0000000000000..a1b15acdecb11 --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/data/cpeng.a.2.sql @@ -0,0 +1 @@ +insert into a values ('0000002'); diff --git a/br/tests/lightning_pd_leader_switch/data/cpeng.a.3.sql b/br/tests/lightning_pd_leader_switch/data/cpeng.a.3.sql new file mode 100644 index 0000000000000..0a457febecf55 --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/data/cpeng.a.3.sql @@ -0,0 +1 @@ +insert into a values ('0000003'),('0000004'); diff --git a/br/tests/lightning_pd_leader_switch/data/cpeng.b-schema.sql b/br/tests/lightning_pd_leader_switch/data/cpeng.b-schema.sql new file mode 100644 index 0000000000000..4a3c844ef8b3f --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/data/cpeng.b-schema.sql @@ -0,0 +1 @@ +create table b (c int); diff --git a/br/tests/lightning_pd_leader_switch/data/cpeng.b.1.sql b/br/tests/lightning_pd_leader_switch/data/cpeng.b.1.sql new file mode 100644 index 0000000000000..cadf0227f99ea --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/data/cpeng.b.1.sql @@ -0,0 +1,4 @@ +insert into b values (10),(11),(12); +/* +padding to make the data file > 50 bytes +*/ diff --git a/br/tests/lightning_pd_leader_switch/data/cpeng.b.2.sql b/br/tests/lightning_pd_leader_switch/data/cpeng.b.2.sql new file mode 100644 index 0000000000000..83045aee9ebaa --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/data/cpeng.b.2.sql @@ -0,0 +1 @@ +insert into b values (13); diff --git a/br/tests/lightning_pd_leader_switch/run.sh b/br/tests/lightning_pd_leader_switch/run.sh new file mode 100644 index 0000000000000..fc43bad254feb --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/run.sh @@ -0,0 +1,67 @@ +#!/bin/bash +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -eux + +cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +. $cur/../_utils/run_services +PD_CONFIG=${PD_CONFIG:-"$cur/../config/pd.toml"} +TIDB_CONFIG=${TIDB_CONFIG:-"$cur/../config/tidb.toml"} + +bin/pd-server --join "https://$PD_ADDR" \ + --client-urls "https://${PD_ADDR}2" \ + --peer-urls "https://${PD_PEER_ADDR}2" \ + --log-file "$TEST_DIR/pd2.log" \ + --data-dir "$TEST_DIR/pd2" \ + --name pd2 \ + --config $PD_CONFIG & + +# strange that new PD can't join too quickly +sleep 10 + +bin/pd-server --join "https://$PD_ADDR" \ + --client-urls "https://${PD_ADDR}3" \ + --peer-urls "https://${PD_PEER_ADDR}3" \ + --log-file "$TEST_DIR/pd3.log" \ + --data-dir "$TEST_DIR/pd3" \ + --name pd3 \ + --config $PD_CONFIG & + +# restart TiDB to let TiDB load new PD nodes +killall tidb-server +# wait for TiDB to exit to release file lock +sleep 5 +start_tidb + +export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/importer/beforeRun=sleep(60000)' +run_lightning --backend local --enable-checkpoint=0 & +lightning_pid=$! +# in many libraries, etcd client's auto-sync-interval is 30s, so we need to wait at least 30s before kill PD leader +sleep 45 +kill $(cat /tmp/backup_restore_test/pd_pid.txt) + +# Check that everything is correctly imported +wait $lightning_pid +run_sql 'SELECT count(*), sum(c) FROM cpeng.a' +check_contains 'count(*): 4' +check_contains 'sum(c): 10' + +run_sql 'SELECT count(*), sum(c) FROM cpeng.b' +check_contains 'count(*): 4' +check_contains 'sum(c): 46' + +restart_services