From 2fe8a357e746df39584844076b9280652c765c85 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 15 May 2023 17:33:17 +0800 Subject: [PATCH] lightning: add option to control pause scheduler scope (#43762) (#43809) close pingcap/tidb#43657 --- br/pkg/lightning/backend/local/local.go | 6 ++- br/pkg/lightning/config/BUILD.bazel | 2 +- br/pkg/lightning/config/config.go | 51 ++++++++++++++++--- br/pkg/lightning/config/config_test.go | 32 ++++++++++++ br/pkg/lightning/importer/import.go | 8 ++- .../lightning_csv/config-pause-global.toml | 13 +++++ br/tests/lightning_csv/run.sh | 29 +++++++++-- executor/importer/table_import.go | 1 + 8 files changed, 126 insertions(+), 16 deletions(-) create mode 100644 br/tests/lightning_csv/config-pause-global.toml diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 7bac978a6b3c6..f2d1cffef436c 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -418,6 +418,8 @@ type BackendConfig struct { // the minimum value is 128. MaxOpenFiles int KeyspaceName string + // the scope when pause PD schedulers. + PausePDSchedulerScope config.PausePDSchedulerScope } // NewBackendConfig creates a new BackendConfig. @@ -440,6 +442,7 @@ func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName string) ShouldCheckWriteStall: cfg.Cron.SwitchMode.Duration == 0, MaxOpenFiles: maxOpenFiles, KeyspaceName: keyspaceName, + PausePDSchedulerScope: cfg.TikvImporter.PausePDSchedulerScope, } } @@ -1413,7 +1416,8 @@ func (local *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, re return err } - if len(regionRanges) > 0 && local.pdCtl.CanPauseSchedulerByKeyRange() { + if len(regionRanges) > 0 && local.PausePDSchedulerScope == config.PausePDSchedulerScopeTable { + log.FromContext(ctx).Info("pause pd scheduler of table scope") subCtx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/br/pkg/lightning/config/BUILD.bazel b/br/pkg/lightning/config/BUILD.bazel index 98bc45e961438..c541a5c4dfe27 100644 --- a/br/pkg/lightning/config/BUILD.bazel +++ b/br/pkg/lightning/config/BUILD.bazel @@ -41,7 +41,7 @@ go_test( "configlist_test.go", ], flaky = True, - shard_count = 44, + shard_count = 45, deps = [ ":config", "//br/pkg/lightning/common", diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 02cc92501d71d..3837a88d086bb 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -427,6 +427,33 @@ func (cfg *MaxError) UnmarshalTOML(v interface{}) error { } } +// PausePDSchedulerScope the scope when pausing pd schedulers. +type PausePDSchedulerScope string + +// constants for PausePDSchedulerScope. +const ( + // PausePDSchedulerScopeTable pause scheduler by adding schedule=deny label to target key range of the table. + PausePDSchedulerScopeTable PausePDSchedulerScope = "table" + // PausePDSchedulerScopeGlobal pause scheduler by remove global schedulers. + // schedulers removed includes: + // - balance-leader-scheduler + // - balance-hot-region-scheduler + // - balance-region-scheduler + // - shuffle-leader-scheduler + // - shuffle-region-scheduler + // - shuffle-hot-region-scheduler + // and we also set configs below: + // - max-merge-region-keys = 0 + // - max-merge-region-size = 0 + // - leader-schedule-limit = min(40, * ) + // - region-schedule-limit = min(40, * ) + // - max-snapshot-count = min(40, * ) + // - enable-location-replacement = false + // - max-pending-peer-count = math.MaxInt32 + // see br/pkg/pdutil/pd.go for more detail. + PausePDSchedulerScopeGlobal PausePDSchedulerScope = "global" +) + // DuplicateResolutionAlgorithm is the config type of how to resolve duplicates. type DuplicateResolutionAlgorithm int @@ -730,6 +757,8 @@ type TikvImporter struct { EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"` LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"` StoreWriteBWLimit ByteSize `toml:"store-write-bwlimit" json:"store-write-bwlimit"` + // default is PausePDSchedulerScopeTable to compatible with previous version(>= 6.1) + PausePDSchedulerScope PausePDSchedulerScope `toml:"pause-pd-scheduler-scope" json:"pause-pd-scheduler-scope"` } // Checkpoint is the config for checkpoint. @@ -915,13 +944,14 @@ func NewConfig() *Config { DataInvalidCharReplace: string(defaultCSVDataInvalidCharReplace), }, TikvImporter: TikvImporter{ - Backend: "", - OnDuplicate: ReplaceOnDup, - MaxKVPairs: 4096, - SendKVPairs: KVWriteBatchSize, - RegionSplitSize: 0, - DiskQuota: ByteSize(math.MaxInt64), - DuplicateResolution: DupeResAlgNone, + Backend: "", + OnDuplicate: ReplaceOnDup, + MaxKVPairs: 4096, + SendKVPairs: KVWriteBatchSize, + RegionSplitSize: 0, + DiskQuota: ByteSize(math.MaxInt64), + DuplicateResolution: DupeResAlgNone, + PausePDSchedulerScope: PausePDSchedulerScopeTable, }, PostRestore: PostRestore{ Checksum: OpLevelRequired, @@ -1122,6 +1152,13 @@ func (cfg *Config) Adjust(ctx context.Context) error { } } + lowerCaseScope := strings.ToLower(string(cfg.TikvImporter.PausePDSchedulerScope)) + cfg.TikvImporter.PausePDSchedulerScope = PausePDSchedulerScope(lowerCaseScope) + if cfg.TikvImporter.PausePDSchedulerScope != PausePDSchedulerScopeTable && + cfg.TikvImporter.PausePDSchedulerScope != PausePDSchedulerScopeGlobal { + return common.ErrInvalidConfig.GenWithStack("pause-pd-scheduler-scope is invalid, allowed value include: table, global") + } + if err := cfg.CheckAndAdjustTiDBPort(ctx, mustHaveInternalConnections); err != nil { return err } diff --git a/br/pkg/lightning/config/config_test.go b/br/pkg/lightning/config/config_test.go index bf2feed43c9bc..4fc00899ec784 100644 --- a/br/pkg/lightning/config/config_test.go +++ b/br/pkg/lightning/config/config_test.go @@ -83,6 +83,38 @@ func TestAdjustPdAddrAndPort(t *testing.T) { require.Equal(t, "123.45.67.89:1234", cfg.TiDB.PdAddr) } +func TestPausePDSchedulerScope(t *testing.T) { + ts, host, port := startMockServer(t, http.StatusOK, + `{"port":4444,"advertise-address":"","path":"123.45.67.89:1234,56.78.90.12:3456"}`, + ) + defer ts.Close() + tmpDir := t.TempDir() + + cfg := config.NewConfig() + cfg.TiDB.Host = host + cfg.TiDB.StatusPort = port + cfg.TikvImporter.Backend = config.BackendLocal + cfg.TikvImporter.SortedKVDir = "test" + cfg.Mydumper.SourceDir = tmpDir + require.Equal(t, config.PausePDSchedulerScopeTable, cfg.TikvImporter.PausePDSchedulerScope) + + cfg.TikvImporter.PausePDSchedulerScope = "" + err := cfg.Adjust(context.Background()) + require.ErrorContains(t, err, "pause-pd-scheduler-scope is invalid") + + cfg.TikvImporter.PausePDSchedulerScope = "xxx" + err = cfg.Adjust(context.Background()) + require.ErrorContains(t, err, "pause-pd-scheduler-scope is invalid") + + cfg.TikvImporter.PausePDSchedulerScope = "TABLE" + require.NoError(t, cfg.Adjust(context.Background())) + require.Equal(t, config.PausePDSchedulerScopeTable, cfg.TikvImporter.PausePDSchedulerScope) + + cfg.TikvImporter.PausePDSchedulerScope = "globAL" + require.NoError(t, cfg.Adjust(context.Background())) + require.Equal(t, config.PausePDSchedulerScopeGlobal, cfg.TikvImporter.PausePDSchedulerScope) +} + func TestAdjustPdAddrAndPortViaAdvertiseAddr(t *testing.T) { ts, host, port := startMockServer(t, http.StatusOK, `{"port":6666,"advertise-address":"121.212.121.212:5555","path":"34.34.34.34:3434"}`, diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index cad55491c02b9..78eaab1e7ec9e 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -1507,8 +1507,8 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) { err error ) - if !rc.taskMgr.CanPauseSchedulerByKeyRange() { - logTask.Info("removing PD leader®ion schedulers") + if rc.cfg.TikvImporter.PausePDSchedulerScope == config.PausePDSchedulerScopeGlobal { + logTask.Info("pause pd scheduler of global scope") restoreFn, err = rc.taskMgr.CheckAndPausePdSchedulers(ctx) if err != nil { @@ -2183,6 +2183,10 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error { return common.ErrMetaMgrUnknown.Wrap(err).GenWithStackByArgs() } } + if rc.cfg.TikvImporter.PausePDSchedulerScope == config.PausePDSchedulerScopeTable && + !rc.taskMgr.CanPauseSchedulerByKeyRange() { + return errors.New("target cluster don't support pause-pd-scheduler-scope=table, the minimal version required is 6.1.0") + } if rc.cfg.App.CheckRequirements { needCheck := true if rc.cfg.Checkpoint.Enable { diff --git a/br/tests/lightning_csv/config-pause-global.toml b/br/tests/lightning_csv/config-pause-global.toml new file mode 100644 index 0000000000000..86cd6e3721762 --- /dev/null +++ b/br/tests/lightning_csv/config-pause-global.toml @@ -0,0 +1,13 @@ +[mydumper.csv] +separator = ',' +delimiter = '"' +header = false +not-null = false +null = '\N' +backslash-escape = true +trim-last-separator = false + +[tikv-importer] +send-kv-pairs=10 +region-split-size = 1024 +pause-pd-scheduler-scope = "global" diff --git a/br/tests/lightning_csv/run.sh b/br/tests/lightning_csv/run.sh index 682bc55b08e26..2f35fd9eb7950 100755 --- a/br/tests/lightning_csv/run.sh +++ b/br/tests/lightning_csv/run.sh @@ -2,14 +2,16 @@ set -eu -for BACKEND in tidb local; do - if [ "$BACKEND" = 'local' ]; then +function run_with() { + backend=$1 + config_file=$2 + if [ "$backend" = 'local' ]; then check_cluster_version 4 0 0 'local backend' || continue fi run_sql 'DROP DATABASE IF EXISTS csv' - run_lightning --backend $BACKEND + run_lightning --backend $backend --config $config_file run_sql 'SELECT count(*), sum(PROCESSLIST_TIME), sum(THREAD_OS_ID), count(PROCESSLIST_STATE) FROM csv.threads' check_contains 'count(*): 43' @@ -39,8 +41,25 @@ for BACKEND in tidb local; do check_contains 'id: 3' run_sql 'SELECT id FROM csv.empty_strings WHERE b <> ""' check_not_contains 'id:' +} -done +rm -rf $TEST_DIR/lightning.log +run_with "local" "tests/$TEST_NAME/config-pause-global.toml" +grep -F 'pause pd scheduler of global scope' $TEST_DIR/lightning.log +if grep -F 'pause pd scheduler of table scope' $TEST_DIR/lightning.log; then + echo "should not contain 'table scope'" + exit 1 +fi + +rm -rf $TEST_DIR/lightning.log +run_with "local" "tests/$TEST_NAME/config.toml" +grep -F 'pause pd scheduler of table scope' $TEST_DIR/lightning.log +if grep -F 'pause pd scheduler of global scope' $TEST_DIR/lightning.log; then + echo "should not contain 'global scope'" + exit 1 +fi + +run_with "tidb" "tests/$TEST_NAME/config.toml" set +e run_lightning --backend local -d "tests/$TEST_NAME/errData" --log-file "$TEST_DIR/lightning-err.log" 2>/dev/null @@ -48,4 +67,4 @@ set -e # err content presented grep ",7,8" "$TEST_DIR/lightning-err.log" # pos should not set to end -grep "[\"syntax error\"] [pos=22]" "$TEST_DIR/lightning-err.log" \ No newline at end of file +grep "[\"syntax error\"] [pos=22]" "$TEST_DIR/lightning-err.log" diff --git a/executor/importer/table_import.go b/executor/importer/table_import.go index c0ab7b9a1d7a5..78aae4fb11e2a 100644 --- a/executor/importer/table_import.go +++ b/executor/importer/table_import.go @@ -130,6 +130,7 @@ func NewTableImporter(param *JobImportParam, e *LoadDataController) (ti *TableIm ShouldCheckWriteStall: true, MaxOpenFiles: int(util.GenRLimit()), KeyspaceName: keySpaceName, + PausePDSchedulerScope: config.PausePDSchedulerScopeTable, } tableMeta := &mydump.MDTableMeta{