Skip to content

Commit

Permalink
lightning: add option to control pause scheduler scope (#43762)
Browse files Browse the repository at this point in the history
close #43657
  • Loading branch information
D3Hunter authored May 15, 2023
1 parent 50b6987 commit 6043234
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 16 deletions.
6 changes: 5 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,8 @@ type BackendConfig struct {
// the minimum value is 128.
MaxOpenFiles int
KeyspaceName string
// the scope when pause PD schedulers.
PausePDSchedulerScope config.PausePDSchedulerScope
}

// NewBackendConfig creates a new BackendConfig.
Expand All @@ -437,6 +439,7 @@ func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName string)
ShouldCheckWriteStall: cfg.Cron.SwitchMode.Duration == 0,
MaxOpenFiles: maxOpenFiles,
KeyspaceName: keyspaceName,
PausePDSchedulerScope: cfg.TikvImporter.PausePDSchedulerScope,
}
}

Expand Down Expand Up @@ -1410,7 +1413,8 @@ func (local *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, re
return err
}

if len(regionRanges) > 0 && local.pdCtl.CanPauseSchedulerByKeyRange() {
if len(regionRanges) > 0 && local.PausePDSchedulerScope == config.PausePDSchedulerScopeTable {
log.FromContext(ctx).Info("pause pd scheduler of table scope")
subCtx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ go_test(
"configlist_test.go",
],
flaky = True,
shard_count = 44,
shard_count = 45,
deps = [
":config",
"//br/pkg/lightning/common",
Expand Down
51 changes: 44 additions & 7 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,33 @@ func (cfg *MaxError) UnmarshalTOML(v interface{}) error {
}
}

// PausePDSchedulerScope the scope when pausing pd schedulers.
type PausePDSchedulerScope string

// constants for PausePDSchedulerScope.
const (
// PausePDSchedulerScopeTable pause scheduler by adding schedule=deny label to target key range of the table.
PausePDSchedulerScopeTable PausePDSchedulerScope = "table"
// PausePDSchedulerScopeGlobal pause scheduler by remove global schedulers.
// schedulers removed includes:
// - balance-leader-scheduler
// - balance-hot-region-scheduler
// - balance-region-scheduler
// - shuffle-leader-scheduler
// - shuffle-region-scheduler
// - shuffle-hot-region-scheduler
// and we also set configs below:
// - max-merge-region-keys = 0
// - max-merge-region-size = 0
// - leader-schedule-limit = min(40, <store-count> * <current value of leader-schedule-limit>)
// - region-schedule-limit = min(40, <store-count> * <current value of region-schedule-limit>)
// - max-snapshot-count = min(40, <store-count> * <current value of max-snapshot-count>)
// - enable-location-replacement = false
// - max-pending-peer-count = math.MaxInt32
// see br/pkg/pdutil/pd.go for more detail.
PausePDSchedulerScopeGlobal PausePDSchedulerScope = "global"
)

// DuplicateResolutionAlgorithm is the config type of how to resolve duplicates.
type DuplicateResolutionAlgorithm int

Expand Down Expand Up @@ -730,6 +757,8 @@ type TikvImporter struct {
EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"`
LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"`
StoreWriteBWLimit ByteSize `toml:"store-write-bwlimit" json:"store-write-bwlimit"`
// default is PausePDSchedulerScopeTable to compatible with previous version(>= 6.1)
PausePDSchedulerScope PausePDSchedulerScope `toml:"pause-pd-scheduler-scope" json:"pause-pd-scheduler-scope"`
}

// Checkpoint is the config for checkpoint.
Expand Down Expand Up @@ -915,13 +944,14 @@ func NewConfig() *Config {
DataInvalidCharReplace: string(defaultCSVDataInvalidCharReplace),
},
TikvImporter: TikvImporter{
Backend: "",
OnDuplicate: ReplaceOnDup,
MaxKVPairs: 4096,
SendKVPairs: KVWriteBatchSize,
RegionSplitSize: 0,
DiskQuota: ByteSize(math.MaxInt64),
DuplicateResolution: DupeResAlgNone,
Backend: "",
OnDuplicate: ReplaceOnDup,
MaxKVPairs: 4096,
SendKVPairs: KVWriteBatchSize,
RegionSplitSize: 0,
DiskQuota: ByteSize(math.MaxInt64),
DuplicateResolution: DupeResAlgNone,
PausePDSchedulerScope: PausePDSchedulerScopeTable,
},
PostRestore: PostRestore{
Checksum: OpLevelRequired,
Expand Down Expand Up @@ -1122,6 +1152,13 @@ func (cfg *Config) Adjust(ctx context.Context) error {
}
}

lowerCaseScope := strings.ToLower(string(cfg.TikvImporter.PausePDSchedulerScope))
cfg.TikvImporter.PausePDSchedulerScope = PausePDSchedulerScope(lowerCaseScope)
if cfg.TikvImporter.PausePDSchedulerScope != PausePDSchedulerScopeTable &&
cfg.TikvImporter.PausePDSchedulerScope != PausePDSchedulerScopeGlobal {
return common.ErrInvalidConfig.GenWithStack("pause-pd-scheduler-scope is invalid, allowed value include: table, global")
}

if err := cfg.CheckAndAdjustTiDBPort(ctx, mustHaveInternalConnections); err != nil {
return err
}
Expand Down
32 changes: 32 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,38 @@ func TestAdjustPdAddrAndPort(t *testing.T) {
require.Equal(t, "123.45.67.89:1234", cfg.TiDB.PdAddr)
}

func TestPausePDSchedulerScope(t *testing.T) {
ts, host, port := startMockServer(t, http.StatusOK,
`{"port":4444,"advertise-address":"","path":"123.45.67.89:1234,56.78.90.12:3456"}`,
)
defer ts.Close()
tmpDir := t.TempDir()

cfg := config.NewConfig()
cfg.TiDB.Host = host
cfg.TiDB.StatusPort = port
cfg.TikvImporter.Backend = config.BackendLocal
cfg.TikvImporter.SortedKVDir = "test"
cfg.Mydumper.SourceDir = tmpDir
require.Equal(t, config.PausePDSchedulerScopeTable, cfg.TikvImporter.PausePDSchedulerScope)

cfg.TikvImporter.PausePDSchedulerScope = ""
err := cfg.Adjust(context.Background())
require.ErrorContains(t, err, "pause-pd-scheduler-scope is invalid")

cfg.TikvImporter.PausePDSchedulerScope = "xxx"
err = cfg.Adjust(context.Background())
require.ErrorContains(t, err, "pause-pd-scheduler-scope is invalid")

cfg.TikvImporter.PausePDSchedulerScope = "TABLE"
require.NoError(t, cfg.Adjust(context.Background()))
require.Equal(t, config.PausePDSchedulerScopeTable, cfg.TikvImporter.PausePDSchedulerScope)

cfg.TikvImporter.PausePDSchedulerScope = "globAL"
require.NoError(t, cfg.Adjust(context.Background()))
require.Equal(t, config.PausePDSchedulerScopeGlobal, cfg.TikvImporter.PausePDSchedulerScope)
}

func TestAdjustPdAddrAndPortViaAdvertiseAddr(t *testing.T) {
ts, host, port := startMockServer(t, http.StatusOK,
`{"port":6666,"advertise-address":"121.212.121.212:5555","path":"34.34.34.34:3434"}`,
Expand Down
8 changes: 6 additions & 2 deletions br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1495,8 +1495,8 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
err error
)

if !rc.taskMgr.CanPauseSchedulerByKeyRange() {
logTask.Info("removing PD leader&region schedulers")
if rc.cfg.TikvImporter.PausePDSchedulerScope == config.PausePDSchedulerScopeGlobal {
logTask.Info("pause pd scheduler of global scope")

restoreFn, err = rc.taskMgr.CheckAndPausePdSchedulers(ctx)
if err != nil {
Expand Down Expand Up @@ -2171,6 +2171,10 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
return common.ErrMetaMgrUnknown.Wrap(err).GenWithStackByArgs()
}
}
if rc.cfg.TikvImporter.PausePDSchedulerScope == config.PausePDSchedulerScopeTable &&
!rc.taskMgr.CanPauseSchedulerByKeyRange() {
return errors.New("target cluster don't support pause-pd-scheduler-scope=table, the minimal version required is 6.1.0")
}
if rc.cfg.App.CheckRequirements {
needCheck := true
if rc.cfg.Checkpoint.Enable {
Expand Down
13 changes: 13 additions & 0 deletions br/tests/lightning_csv/config-pause-global.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[mydumper.csv]
separator = ','
delimiter = '"'
header = false
not-null = false
null = '\N'
backslash-escape = true
trim-last-separator = false

[tikv-importer]
send-kv-pairs=10
region-split-size = 1024
pause-pd-scheduler-scope = "global"
29 changes: 24 additions & 5 deletions br/tests/lightning_csv/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

set -eu

for BACKEND in tidb local; do
if [ "$BACKEND" = 'local' ]; then
function run_with() {
backend=$1
config_file=$2
if [ "$backend" = 'local' ]; then
check_cluster_version 4 0 0 'local backend' || continue
fi

run_sql 'DROP DATABASE IF EXISTS csv'

run_lightning --backend $BACKEND
run_lightning --backend $backend --config $config_file

run_sql 'SELECT count(*), sum(PROCESSLIST_TIME), sum(THREAD_OS_ID), count(PROCESSLIST_STATE) FROM csv.threads'
check_contains 'count(*): 43'
Expand Down Expand Up @@ -39,13 +41,30 @@ for BACKEND in tidb local; do
check_contains 'id: 3'
run_sql 'SELECT id FROM csv.empty_strings WHERE b <> ""'
check_not_contains 'id:'
}

done
rm -rf $TEST_DIR/lightning.log
run_with "local" "tests/$TEST_NAME/config-pause-global.toml"
grep -F 'pause pd scheduler of global scope' $TEST_DIR/lightning.log
if grep -F 'pause pd scheduler of table scope' $TEST_DIR/lightning.log; then
echo "should not contain 'table scope'"
exit 1
fi

rm -rf $TEST_DIR/lightning.log
run_with "local" "tests/$TEST_NAME/config.toml"
grep -F 'pause pd scheduler of table scope' $TEST_DIR/lightning.log
if grep -F 'pause pd scheduler of global scope' $TEST_DIR/lightning.log; then
echo "should not contain 'global scope'"
exit 1
fi

run_with "tidb" "tests/$TEST_NAME/config.toml"

set +e
run_lightning --backend local -d "tests/$TEST_NAME/errData" --log-file "$TEST_DIR/lightning-err.log" 2>/dev/null
set -e
# err content presented
grep ",7,8" "$TEST_DIR/lightning-err.log"
# pos should not set to end
grep "[\"syntax error\"] [pos=22]" "$TEST_DIR/lightning-err.log"
grep "[\"syntax error\"] [pos=22]" "$TEST_DIR/lightning-err.log"
1 change: 1 addition & 0 deletions executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func NewTableImporter(param *JobImportParam, e *LoadDataController) (ti *TableIm
ShouldCheckWriteStall: true,
MaxOpenFiles: int(util.GenRLimit()),
KeyspaceName: keySpaceName,
PausePDSchedulerScope: config.PausePDSchedulerScopeTable,
}

tableMeta := &mydump.MDTableMeta{
Expand Down

0 comments on commit 6043234

Please sign in to comment.