diff --git a/br/pkg/lightning/importer/check_info_test.go b/br/pkg/lightning/importer/check_info_test.go index 27105aeeb536c..71e83f388303e 100644 --- a/br/pkg/lightning/importer/check_info_test.go +++ b/br/pkg/lightning/importer/check_info_test.go @@ -412,6 +412,7 @@ func TestCheckCSVHeader(t *testing.T) { dbMetas, preInfoGetter, nil, + nil, ) preInfoGetter.dbInfosCache = rc.dbInfos err = rc.checkCSVHeader(ctx) @@ -465,6 +466,7 @@ func TestCheckTableEmpty(t *testing.T) { dbMetas, preInfoGetter, nil, + nil, ) rc := &Controller{ @@ -622,6 +624,7 @@ func TestLocalResource(t *testing.T) { nil, preInfoGetter, nil, + nil, ) rc := &Controller{ cfg: cfg, diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index 72353daa7be82..9ae64c784403a 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -464,7 +464,7 @@ func NewImportControllerWithPauser( } preCheckBuilder := NewPrecheckItemBuilder( - cfg, p.DBMetas, preInfoGetter, cpdb, + cfg, p.DBMetas, preInfoGetter, cpdb, pdCli, ) rc := &Controller{ @@ -525,6 +525,8 @@ func (rc *Controller) Close() { // Run starts the restore task. func (rc *Controller) Run(ctx context.Context) error { + failpoint.Inject("beforeRun", func() {}) + opts := []func(context.Context) error{ rc.setGlobalVariables, rc.restoreSchema, @@ -1433,7 +1435,7 @@ const ( func (rc *Controller) keepPauseGCForDupeRes(ctx context.Context) (<-chan struct{}, error) { tlsOpt := rc.tls.ToPDSecurityOption() - pdCli, err := pd.NewClientWithContext(ctx, []string{rc.cfg.TiDB.PdAddr}, tlsOpt) + pdCli, err := pd.NewClientWithContext(ctx, []string{rc.pdCli.GetLeaderAddr()}, tlsOpt) if err != nil { return nil, errors.Trace(err) } @@ -1594,8 +1596,13 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) { } // Disable GC because TiDB enables GC already. + + currentLeaderAddr := rc.pdCli.GetLeaderAddr() + // remove URL scheme + currentLeaderAddr = strings.TrimPrefix(currentLeaderAddr, "http://") + currentLeaderAddr = strings.TrimPrefix(currentLeaderAddr, "https://") kvStore, err = driver.TiKVDriver{}.OpenWithOptions( - fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", rc.cfg.TiDB.PdAddr, rc.keyspaceName), + fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", currentLeaderAddr, rc.keyspaceName), driver.WithSecurity(rc.tls.ToTiKVSecurityConfig()), ) if err != nil { @@ -1800,7 +1807,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) { } func (rc *Controller) registerTaskToPD(ctx context.Context) (undo func(), _ error) { - etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg) + etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg, rc.pdCli.GetLeaderAddr()) if err != nil { return nil, errors.Trace(err) } @@ -2102,7 +2109,7 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error { rc.status.TotalFileSize.Store(estimatedSizeResult.SizeWithoutIndex) } if isLocalBackend(rc.cfg) { - pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr, + pdController, err := pdutil.NewPdController(ctx, rc.pdCli.GetLeaderAddr(), rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption()) if err != nil { return common.NormalizeOrWrapErr(common.ErrCreatePDClient, err) diff --git a/br/pkg/lightning/importer/import_test.go b/br/pkg/lightning/importer/import_test.go index 99824d16fd6bd..f311b81177498 100644 --- a/br/pkg/lightning/importer/import_test.go +++ b/br/pkg/lightning/importer/import_test.go @@ -221,7 +221,7 @@ func TestPreCheckFailed(t *testing.T) { dbMetas: make([]*mydump.MDDatabaseMeta, 0), } cpdb := panicCheckpointDB{} - theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb) + theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb, nil) ctl := &Controller{ cfg: cfg, saveCpCh: make(chan saveCp), diff --git a/br/pkg/lightning/importer/precheck.go b/br/pkg/lightning/importer/precheck.go index 735e17f163ca2..2e4b4b5f000c7 100644 --- a/br/pkg/lightning/importer/precheck.go +++ b/br/pkg/lightning/importer/precheck.go @@ -23,15 +23,21 @@ func WithPrecheckKey(ctx context.Context, key precheckContextKey, val any) conte // PrecheckItemBuilder is used to build precheck items type PrecheckItemBuilder struct { - cfg *config.Config - dbMetas []*mydump.MDDatabaseMeta - preInfoGetter PreImportInfoGetter - checkpointsDB checkpoints.DB + cfg *config.Config + dbMetas []*mydump.MDDatabaseMeta + preInfoGetter PreImportInfoGetter + checkpointsDB checkpoints.DB + pdLeaderAddrGetter func() string } // NewPrecheckItemBuilderFromConfig creates a new PrecheckItemBuilder from config // pdCli **must not** be nil for local backend -func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, pdCli pd.Client, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) { +func NewPrecheckItemBuilderFromConfig( + ctx context.Context, + cfg *config.Config, + pdCli pd.Client, + opts ...ropts.PrecheckItemBuilderOption, +) (*PrecheckItemBuilder, error) { var gerr error builderCfg := new(ropts.PrecheckItemBuilderConfig) for _, o := range opts { @@ -71,7 +77,7 @@ func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, p if err != nil { return nil, errors.Trace(err) } - return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb), gerr + return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb, pdCli), gerr } // NewPrecheckItemBuilder creates a new PrecheckItemBuilder @@ -80,12 +86,21 @@ func NewPrecheckItemBuilder( dbMetas []*mydump.MDDatabaseMeta, preInfoGetter PreImportInfoGetter, checkpointsDB checkpoints.DB, + pdCli pd.Client, ) *PrecheckItemBuilder { + leaderAddrGetter := func() string { + return cfg.TiDB.PdAddr + } + // in tests we may not have a pdCli + if pdCli != nil { + leaderAddrGetter = pdCli.GetLeaderAddr + } return &PrecheckItemBuilder{ - cfg: cfg, - dbMetas: dbMetas, - preInfoGetter: preInfoGetter, - checkpointsDB: checkpointsDB, + cfg: cfg, + dbMetas: dbMetas, + preInfoGetter: preInfoGetter, + checkpointsDB: checkpointsDB, + pdLeaderAddrGetter: leaderAddrGetter, } } @@ -117,7 +132,7 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID precheck.CheckItemID) (p case precheck.CheckLocalTempKVDir: return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter, b.dbMetas), nil case precheck.CheckTargetUsingCDCPITR: - return NewCDCPITRCheckItem(b.cfg), nil + return NewCDCPITRCheckItem(b.cfg, b.pdLeaderAddrGetter), nil default: return nil, errors.Errorf("unsupported check item: %v", checkID) } diff --git a/br/pkg/lightning/importer/precheck_impl.go b/br/pkg/lightning/importer/precheck_impl.go index ed3b034555906..ea8c63ff31ffd 100644 --- a/br/pkg/lightning/importer/precheck_impl.go +++ b/br/pkg/lightning/importer/precheck_impl.go @@ -747,17 +747,19 @@ func (ci *checkpointCheckItem) checkpointIsValid(ctx context.Context, tableInfo // CDCPITRCheckItem check downstream has enabled CDC or PiTR. It's exposed to let // caller override the Instruction message. type CDCPITRCheckItem struct { - cfg *config.Config - Instruction string + cfg *config.Config + Instruction string + leaderAddrGetter func() string // used in test etcdCli *clientv3.Client } // NewCDCPITRCheckItem creates a checker to check downstream has enabled CDC or PiTR. -func NewCDCPITRCheckItem(cfg *config.Config) precheck.Checker { +func NewCDCPITRCheckItem(cfg *config.Config, leaderAddrGetter func() string) precheck.Checker { return &CDCPITRCheckItem{ - cfg: cfg, - Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.", + cfg: cfg, + Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.", + leaderAddrGetter: leaderAddrGetter, } } @@ -766,7 +768,11 @@ func (*CDCPITRCheckItem) GetCheckItemID() precheck.CheckItemID { return precheck.CheckTargetUsingCDCPITR } -func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client, error) { +func dialEtcdWithCfg( + ctx context.Context, + cfg *config.Config, + leaderAddr string, +) (*clientv3.Client, error) { cfg2, err := cfg.ToTLS() if err != nil { return nil, err @@ -775,7 +781,7 @@ func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client, return clientv3.New(clientv3.Config{ TLS: tlsConfig, - Endpoints: []string{cfg.TiDB.PdAddr}, + Endpoints: []string{leaderAddr}, AutoSyncInterval: 30 * time.Second, DialTimeout: 5 * time.Second, DialOptions: []grpc.DialOption{ @@ -802,7 +808,7 @@ func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*precheck.CheckResult, e if ci.etcdCli == nil { var err error - ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg) + ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg, ci.leaderAddrGetter()) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/lightning/importer/precheck_impl_test.go b/br/pkg/lightning/importer/precheck_impl_test.go index a7cbd4ee799b8..a7839ba821afd 100644 --- a/br/pkg/lightning/importer/precheck_impl_test.go +++ b/br/pkg/lightning/importer/precheck_impl_test.go @@ -598,7 +598,7 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() { Backend: config.BackendLocal, }, } - ci := NewCDCPITRCheckItem(cfg) + ci := NewCDCPITRCheckItem(cfg, nil) checker := ci.(*CDCPITRCheckItem) checker.etcdCli = testEtcdCluster.RandClient() result, err := ci.Check(ctx) diff --git a/br/pkg/lightning/importer/precheck_test.go b/br/pkg/lightning/importer/precheck_test.go index 9a0a59f1b65c2..4005276fe501e 100644 --- a/br/pkg/lightning/importer/precheck_test.go +++ b/br/pkg/lightning/importer/precheck_test.go @@ -32,7 +32,7 @@ func TestPrecheckBuilderBasic(t *testing.T) { preInfoGetter, err := NewPreImportInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil) require.NoError(t, err) - theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil) + theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil, nil) for _, checkItemID := range []precheck.CheckItemID{ precheck.CheckLargeDataFile, precheck.CheckSourcePermission, diff --git a/br/pkg/lightning/importer/table_import_test.go b/br/pkg/lightning/importer/table_import_test.go index dcd5f02574c48..c81b1bcdff26d 100644 --- a/br/pkg/lightning/importer/table_import_test.go +++ b/br/pkg/lightning/importer/table_import_test.go @@ -1192,7 +1192,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource() { targetInfoGetter: targetInfoGetter, srcStorage: mockStore, } - theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil) + theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil, nil) rc := &Controller{ cfg: cfg, tls: tls, @@ -1351,7 +1351,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() { targetInfoGetter: targetInfoGetter, dbMetas: dbMetas, } - theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB()) + theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB(), nil) rc := &Controller{ cfg: cfg, tls: tls, @@ -1447,7 +1447,7 @@ func (s *tableRestoreSuite) TestCheckHasLargeCSV() { for _, ca := range cases { template := NewSimpleTemplate() cfg := &config.Config{Mydumper: config.MydumperRuntime{StrictFormat: ca.strictFormat}} - theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil) + theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil, nil) rc := &Controller{ cfg: cfg, checkTemplate: template, diff --git a/br/pkg/lightning/log/log.go b/br/pkg/lightning/log/log.go index ba9d822296dfb..ca42c5a8d42cd 100644 --- a/br/pkg/lightning/log/log.go +++ b/br/pkg/lightning/log/log.go @@ -97,7 +97,7 @@ func InitLogger(cfg *Config, _ string) error { // Disable annoying TiDB Log. // TODO: some error logs outputs randomly, we need to fix them in TiDB. // this LEVEL only affects SlowQueryLogger, later ReplaceGlobals will overwrite it. - tidbLogCfg.Level = "fatal" + tidbLogCfg.Level = "debug" // this also init GRPCLogger, controlled by GRPC_DEBUG env. err := logutil.InitLogger(&tidbLogCfg) if err != nil { diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go index 14dac11b19e79..b6533a30a15a2 100644 --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -329,6 +329,16 @@ func parseVersion(versionBytes []byte) *semver.Version { return version } +// TODO: always read latest PD nodes from PD client +func (p *PdController) getAllPDAddrs() []string { + ret := make([]string, 0, len(p.addrs)+1) + if p.pdClient != nil { + ret = append(ret, p.pdClient.GetLeaderAddr()) + } + ret = append(ret, p.addrs...) + return ret +} + func (p *PdController) isPauseConfigEnabled() bool { return p.version.Compare(pauseConfigVersion) >= 0 } @@ -356,7 +366,7 @@ func (p *PdController) GetClusterVersion(ctx context.Context) (string, error) { func (p *PdController) getClusterVersionWith(ctx context.Context, get pdHTTPRequest) (string, error) { var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { v, e := get(ctx, addr, clusterVersionPrefix, p.cli, http.MethodGet, nil) if e != nil { err = e @@ -383,7 +393,7 @@ func (p *PdController) getRegionCountWith( end = url.QueryEscape(string(codec.EncodeBytes(nil, endKey))) } var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { query := fmt.Sprintf( "%s?start_key=%s&end_key=%s", regionCountPrefix, start, end) @@ -410,7 +420,7 @@ func (p *PdController) GetStoreInfo(ctx context.Context, storeID uint64) (*pdtyp func (p *PdController) getStoreInfoWith( ctx context.Context, get pdHTTPRequest, storeID uint64) (*pdtypes.StoreInfo, error) { var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { query := fmt.Sprintf( "%s/%d", storePrefix, storeID) @@ -440,7 +450,7 @@ func (p *PdController) doPauseSchedulers(ctx context.Context, removedSchedulers := make([]string, 0, len(schedulers)) for _, scheduler := range schedulers { prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler) - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { _, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body)) if err == nil { removedSchedulers = append(removedSchedulers, scheduler) @@ -523,7 +533,7 @@ func (p *PdController) resumeSchedulerWith(ctx context.Context, schedulers []str } for _, scheduler := range schedulers { prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler) - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { _, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body)) if err == nil { break @@ -547,7 +557,7 @@ func (p *PdController) ListSchedulers(ctx context.Context) ([]string, error) { func (p *PdController) listSchedulersWith(ctx context.Context, get pdHTTPRequest) ([]string, error) { var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { v, e := get(ctx, addr, schedulerPrefix, p.cli, http.MethodGet, nil) if e != nil { err = e @@ -569,7 +579,7 @@ func (p *PdController) GetPDScheduleConfig( ctx context.Context, ) (map[string]interface{}, error) { var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { v, e := pdRequest( ctx, addr, scheduleConfigPrefix, p.cli, http.MethodGet, nil) if e != nil { @@ -607,7 +617,7 @@ func (p *PdController) doUpdatePDScheduleConfig( newCfg[sc] = v } - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { reqData, err := json.Marshal(newCfg) if err != nil { return errors.Trace(err) @@ -835,7 +845,7 @@ func (p *PdController) doRemoveSchedulersWith( // GetMinResolvedTS get min-resolved-ts from pd func (p *PdController) GetMinResolvedTS(ctx context.Context) (uint64, error) { var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { v, e := pdRequest(ctx, addr, minResolvedTSPrefix, p.cli, http.MethodGet, nil) if e != nil { log.Warn("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e)) @@ -869,7 +879,7 @@ func (p *PdController) RecoverBaseAllocID(ctx context.Context, id uint64) error ID: fmt.Sprintf("%d", id), }) var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { _, e := pdRequest(ctx, addr, baseAllocIDPrefix, p.cli, http.MethodPost, bytes.NewBuffer(reqData)) if e != nil { log.Warn("failed to recover base alloc id", zap.String("addr", addr), zap.Error(e)) @@ -893,7 +903,7 @@ func (p *PdController) ResetTS(ctx context.Context, ts uint64) error { ForceUseLarger: true, }) var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { code, _, e := pdRequestWithCode(ctx, addr, resetTSPrefix, p.cli, http.MethodPost, bytes.NewBuffer(reqData)) if e != nil { // for pd version <= 6.2, if the given ts < current ts of pd, pd returns StatusForbidden. @@ -923,7 +933,7 @@ func (p *PdController) UnmarkRecovering(ctx context.Context) error { func (p *PdController) operateRecoveringMark(ctx context.Context, method string) error { var err error - for _, addr := range p.addrs { + for _, addr := range p.getAllPDAddrs() { _, e := pdRequest(ctx, addr, recoveringMarkPrefix, p.cli, method, nil) if e != nil { log.Warn("failed to operate recovering mark", zap.String("method", method), @@ -968,7 +978,8 @@ func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule L panic(err) } var lastErr error - for i, addr := range p.addrs { + addrs := p.getAllPDAddrs() + for i, addr := range addrs { _, lastErr = pdRequest(ctx, addr, regionLabelPrefix, p.cli, http.MethodPost, bytes.NewBuffer(reqData)) if lastErr == nil { @@ -978,7 +989,7 @@ func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule L return errors.Trace(lastErr) } - if i < len(p.addrs) { + if i < len(addrs) { log.Warn("failed to create or update region label rule, will try next pd address", zap.Error(lastErr), zap.String("pdAddr", addr)) } @@ -989,7 +1000,8 @@ func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule L // DeleteRegionLabelRule deletes a region label rule. func (p *PdController) DeleteRegionLabelRule(ctx context.Context, ruleID string) error { var lastErr error - for i, addr := range p.addrs { + addrs := p.getAllPDAddrs() + for i, addr := range addrs { _, lastErr = pdRequest(ctx, addr, fmt.Sprintf("%s/%s", regionLabelPrefix, ruleID), p.cli, http.MethodDelete, nil) if lastErr == nil { @@ -999,7 +1011,7 @@ func (p *PdController) DeleteRegionLabelRule(ctx context.Context, ruleID string) return errors.Trace(lastErr) } - if i < len(p.addrs) { + if i < len(addrs) { log.Warn("failed to delete region label rule, will try next pd address", zap.Error(lastErr), zap.String("pdAddr", addr)) } diff --git a/br/tests/_utils/run_services b/br/tests/_utils/run_services index 6021b92cfa6b3..ab8f78012bfc0 100644 --- a/br/tests/_utils/run_services +++ b/br/tests/_utils/run_services @@ -18,6 +18,7 @@ set -eu export PD_PEER_ADDR="127.0.0.1:2380" export PD_ADDR="127.0.0.1:2379" +export PD_PID="${TEST_DIR:?}/pd_pid.txt" export TIDB_IP="127.0.0.1" export TIDB_PORT="4000" export TIDB_ADDR="127.0.0.1:4000" @@ -75,13 +76,15 @@ start_services() { start_pd() { echo "Starting PD..." - mkdir -p "$TEST_DIR/pd" + mkdir -p "$TEST_DIR/pd" -m 700 pd-server \ --client-urls "https://$PD_ADDR" \ --peer-urls "https://$PD_PEER_ADDR" \ --log-file "$TEST_DIR/pd.log" \ --data-dir "$TEST_DIR/pd" \ --config $PD_CONFIG & + pid=$! + echo -e "$pid" > "${PD_PID}" # wait until PD is online... i=0 while ! run_curl "https://$PD_ADDR/pd/api/v1/version"; do diff --git a/br/tests/lightning_pd_leader_switch/config.toml b/br/tests/lightning_pd_leader_switch/config.toml new file mode 100644 index 0000000000000..73c54882430c7 --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/config.toml @@ -0,0 +1,14 @@ +[lightning] +table-concurrency = 1 +index-concurrency = 1 + +[checkpoint] +enable = true +driver = "file" +schema = "tidb_lightning_checkpoint_local_backend_test" + +[tikv-importer] +send-kv-pairs = 2 + +[mydumper] +batch-size = 50 # force splitting the data into 4 batches diff --git a/br/tests/lightning_pd_leader_switch/data/cpeng-schema-create.sql b/br/tests/lightning_pd_leader_switch/data/cpeng-schema-create.sql new file mode 100644 index 0000000000000..1e23466eeee52 --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/data/cpeng-schema-create.sql @@ -0,0 +1 @@ +create database cpeng; diff --git a/br/tests/lightning_pd_leader_switch/data/cpeng.a-schema.sql b/br/tests/lightning_pd_leader_switch/data/cpeng.a-schema.sql new file mode 100644 index 0000000000000..6c1f5ee154c58 --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/data/cpeng.a-schema.sql @@ -0,0 +1 @@ +create table a (c VARCHAR(20) PRIMARY KEY); diff --git a/br/tests/lightning_pd_leader_switch/data/cpeng.a.1.sql b/br/tests/lightning_pd_leader_switch/data/cpeng.a.1.sql new file mode 100644 index 0000000000000..a75039e1304e3 --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/data/cpeng.a.1.sql @@ -0,0 +1 @@ +insert into a values ('0000001'); diff --git a/br/tests/lightning_pd_leader_switch/data/cpeng.a.2.sql b/br/tests/lightning_pd_leader_switch/data/cpeng.a.2.sql new file mode 100644 index 0000000000000..a1b15acdecb11 --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/data/cpeng.a.2.sql @@ -0,0 +1 @@ +insert into a values ('0000002'); diff --git a/br/tests/lightning_pd_leader_switch/data/cpeng.a.3.sql b/br/tests/lightning_pd_leader_switch/data/cpeng.a.3.sql new file mode 100644 index 0000000000000..0a457febecf55 --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/data/cpeng.a.3.sql @@ -0,0 +1 @@ +insert into a values ('0000003'),('0000004'); diff --git a/br/tests/lightning_pd_leader_switch/data/cpeng.b-schema.sql b/br/tests/lightning_pd_leader_switch/data/cpeng.b-schema.sql new file mode 100644 index 0000000000000..4a3c844ef8b3f --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/data/cpeng.b-schema.sql @@ -0,0 +1 @@ +create table b (c int); diff --git a/br/tests/lightning_pd_leader_switch/data/cpeng.b.1.sql b/br/tests/lightning_pd_leader_switch/data/cpeng.b.1.sql new file mode 100644 index 0000000000000..cadf0227f99ea --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/data/cpeng.b.1.sql @@ -0,0 +1,4 @@ +insert into b values (10),(11),(12); +/* +padding to make the data file > 50 bytes +*/ diff --git a/br/tests/lightning_pd_leader_switch/data/cpeng.b.2.sql b/br/tests/lightning_pd_leader_switch/data/cpeng.b.2.sql new file mode 100644 index 0000000000000..83045aee9ebaa --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/data/cpeng.b.2.sql @@ -0,0 +1 @@ +insert into b values (13); diff --git a/br/tests/lightning_pd_leader_switch/run.sh b/br/tests/lightning_pd_leader_switch/run.sh new file mode 100644 index 0000000000000..fc43bad254feb --- /dev/null +++ b/br/tests/lightning_pd_leader_switch/run.sh @@ -0,0 +1,67 @@ +#!/bin/bash +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -eux + +cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +. $cur/../_utils/run_services +PD_CONFIG=${PD_CONFIG:-"$cur/../config/pd.toml"} +TIDB_CONFIG=${TIDB_CONFIG:-"$cur/../config/tidb.toml"} + +bin/pd-server --join "https://$PD_ADDR" \ + --client-urls "https://${PD_ADDR}2" \ + --peer-urls "https://${PD_PEER_ADDR}2" \ + --log-file "$TEST_DIR/pd2.log" \ + --data-dir "$TEST_DIR/pd2" \ + --name pd2 \ + --config $PD_CONFIG & + +# strange that new PD can't join too quickly +sleep 10 + +bin/pd-server --join "https://$PD_ADDR" \ + --client-urls "https://${PD_ADDR}3" \ + --peer-urls "https://${PD_PEER_ADDR}3" \ + --log-file "$TEST_DIR/pd3.log" \ + --data-dir "$TEST_DIR/pd3" \ + --name pd3 \ + --config $PD_CONFIG & + +# restart TiDB to let TiDB load new PD nodes +killall tidb-server +# wait for TiDB to exit to release file lock +sleep 5 +start_tidb + +export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/importer/beforeRun=sleep(60000)' +run_lightning --backend local --enable-checkpoint=0 & +lightning_pid=$! +# in many libraries, etcd client's auto-sync-interval is 30s, so we need to wait at least 30s before kill PD leader +sleep 45 +kill $(cat /tmp/backup_restore_test/pd_pid.txt) + +# Check that everything is correctly imported +wait $lightning_pid +run_sql 'SELECT count(*), sum(c) FROM cpeng.a' +check_contains 'count(*): 4' +check_contains 'sum(c): 10' + +run_sql 'SELECT count(*), sum(c) FROM cpeng.b' +check_contains 'count(*): 4' +check_contains 'sum(c): 46' + +restart_services diff --git a/br/tests/run_group.sh b/br/tests/run_group.sh index a3a8ec6afa234..39068ab078427 100755 --- a/br/tests/run_group.sh +++ b/br/tests/run_group.sh @@ -23,7 +23,7 @@ groups=( ["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable" ["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full" ["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history" - ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index' + ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index' ["G04"]='br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index' ["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table' ["G06"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn' @@ -37,7 +37,7 @@ groups=( ["G14"]='lightning_issue_40657 lightning_issue_410 lightning_issue_519 lightning_local_backend lightning_max_incr lightning_max_random lightning_multi_valued_index lightning_new_collation lightning_no_schema' ["G15"]='lightning_parquet lightning_partition_incremental lightning_partitioned-table lightning_record_network lightning_reload_cert lightning_restore lightning_routes lightning_routes_panic lightning_row-format-v2 lightning_s3' ["G16"]='lightning_shard_rowid lightning_source_linkfile lightning_sqlmode lightning_tidb_duplicate_data lightning_tidb_rowid lightning_tiflash lightning_tikv_multi_rocksdb lightning_too_many_columns lightning_tool_135' - ["G17"]='lightning_tool_1420 lightning_tool_1472 lightning_tool_241 lightning_ttl lightning_unused_config_keys lightning_various_types lightning_view lightning_write_batch lightning_write_limit' + ["G17"]='lightning_tool_1420 lightning_tool_1472 lightning_tool_241 lightning_ttl lightning_unused_config_keys lightning_various_types lightning_view lightning_write_batch lightning_write_limit lightning_pd_leader_switch' ) # Get other cases not in groups, to avoid missing any case