From 8c208d24f3d886b3122510e4a97ac6f29b3c340f Mon Sep 17 00:00:00 2001 From: joccau Date: Mon, 28 Nov 2022 20:34:54 +0800 Subject: [PATCH 1/3] add flag parameter for concurrency for pitr Signed-off-by: joccau --- br/pkg/task/restore.go | 49 +++++++++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index c5a3871bd2ae5..378ad60d5e798 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -58,18 +58,19 @@ const ( // FlagStreamFullBackupStorage is used for log restore, represents the full backup storage. FlagStreamFullBackupStorage = "full-backup-storage" // FlagPiTRBatchCount and FlagPiTRBatchSize are used for restore log with batch method. - FlagPiTRBatchCount = "pitr-batch-count" - FlagPiTRBatchSize = "pitr-batch-size" - - defaultPiTRBatchCount = 16 - defaultPiTRBatchSize = 32 * 1024 * 1024 - defaultRestoreConcurrency = 128 - defaultRestoreStreamConcurrency = 128 - maxRestoreBatchSizeLimit = 10240 - defaultPDConcurrency = 1 - defaultBatchFlushInterval = 16 * time.Second - defaultFlagDdlBatchSize = 128 - resetSpeedLimitRetryTimes = 3 + FlagPiTRBatchCount = "pitr-batch-count" + FlagPiTRBatchSize = "pitr-batch-size" + FlagPiTRConcurrency = "pitr-concurrency" + + defaultPiTRBatchCount = 8 + defaultPiTRBatchSize = 16 * 1024 * 1024 + defaultRestoreConcurrency = 128 + defaultPiTRConcurrency = 16 + maxRestoreBatchSizeLimit = 10240 + defaultPDConcurrency = 1 + defaultBatchFlushInterval = 16 * time.Second + defaultFlagDdlBatchSize = 128 + resetSpeedLimitRetryTimes = 3 ) const ( @@ -175,6 +176,7 @@ type RestoreConfig struct { tiflashRecorder *tiflashrec.TiFlashRecorder `json:"-" toml:"-"` PitrBatchCount uint32 `json:"pitr-batch-count" toml:"pitr-batch-count"` PitrBatchSize uint32 `json:"pitr-batch-size" toml:"pitr-batch-size"` + PitrConcurrency uint32 `json:"-" toml:"-"` // for ebs-based restore FullBackupType FullBackupType `json:"full-backup-type" toml:"full-backup-type"` @@ -206,10 +208,12 @@ func DefineStreamRestoreFlags(command *cobra.Command) { "support TSO or datetime, e.g. '400036290571534337' or '2018-05-11 01:42:23+0800'") command.Flags().String(FlagStreamFullBackupStorage, "", "specify the backup full storage. "+ "fill it if want restore full backup before restore log.") - command.Flags().Uint32(FlagPiTRBatchCount, defaultPiTRBatchCount, "") - command.Flags().Uint32(FlagPiTRBatchSize, defaultPiTRBatchSize, "") + command.Flags().Uint32(FlagPiTRBatchCount, defaultPiTRBatchCount, "specify the batch count to restore log.") + command.Flags().Uint32(FlagPiTRBatchSize, defaultPiTRBatchSize, "specify the batch size to retore log.") + command.Flags().Uint32(FlagPiTRConcurrency, defaultPiTRConcurrency, "specify the concurrency to restore log.") _ = command.Flags().MarkHidden(FlagPiTRBatchCount) _ = command.Flags().MarkHidden(FlagPiTRBatchSize) + _ = command.Flags().MarkHidden(FlagPiTRConcurrency) } // ParseStreamRestoreFlags parses the `restore stream` flags from the flag set. @@ -244,6 +248,9 @@ func (cfg *RestoreConfig) ParseStreamRestoreFlags(flags *pflag.FlagSet) error { if cfg.PitrBatchSize, err = flags.GetUint32(FlagPiTRBatchSize); err != nil { return errors.Trace(err) } + if cfg.PitrConcurrency, err = flags.GetUint32(FlagPiTRConcurrency); err != nil { + return errors.Trace(err) + } return nil } @@ -370,10 +377,18 @@ func (cfg *RestoreConfig) Adjust() { } func (cfg *RestoreConfig) adjustRestoreConfigForStreamRestore() { - if cfg.Config.Concurrency == 0 { - log.Info("set restore kv files concurrency", zap.Int("concurrency", defaultRestoreStreamConcurrency)) - cfg.Config.Concurrency = defaultRestoreStreamConcurrency + if cfg.PitrConcurrency == 0 { + cfg.PitrConcurrency = defaultPiTRConcurrency } + if cfg.PitrBatchCount == 0 { + cfg.PitrBatchCount = defaultPiTRBatchCount + } + if cfg.PitrBatchSize == 0 { + cfg.PitrBatchSize = defaultPiTRBatchSize + } + + log.Info("set restore kv files concurrency", zap.Int("concurrency", int(cfg.PitrConcurrency))) + cfg.Config.Concurrency = cfg.PitrConcurrency } func configureRestoreClient(ctx context.Context, client *restore.Client, cfg *RestoreConfig) error { From 3cc8022a67d571e9e4769811ab0b36c753d4035b Mon Sep 17 00:00:00 2001 From: joccau Date: Mon, 28 Nov 2022 20:57:05 +0800 Subject: [PATCH 2/3] do not hide pitr parameters Signed-off-by: joccau --- br/cmd/br/restore.go | 1 - br/pkg/task/restore.go | 3 --- 2 files changed, 4 deletions(-) diff --git a/br/cmd/br/restore.go b/br/cmd/br/restore.go index 5f91bee91c6a9..e826df0e59e77 100644 --- a/br/cmd/br/restore.go +++ b/br/cmd/br/restore.go @@ -199,6 +199,5 @@ func newStreamRestoreCommand() *cobra.Command { } task.DefineFilterFlags(command, filterOutSysAndMemTables, true) task.DefineStreamRestoreFlags(command) - command.Hidden = true return command } diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 378ad60d5e798..83c22a29e61db 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -211,9 +211,6 @@ func DefineStreamRestoreFlags(command *cobra.Command) { command.Flags().Uint32(FlagPiTRBatchCount, defaultPiTRBatchCount, "specify the batch count to restore log.") command.Flags().Uint32(FlagPiTRBatchSize, defaultPiTRBatchSize, "specify the batch size to retore log.") command.Flags().Uint32(FlagPiTRConcurrency, defaultPiTRConcurrency, "specify the concurrency to restore log.") - _ = command.Flags().MarkHidden(FlagPiTRBatchCount) - _ = command.Flags().MarkHidden(FlagPiTRBatchSize) - _ = command.Flags().MarkHidden(FlagPiTRConcurrency) } // ParseStreamRestoreFlags parses the `restore stream` flags from the flag set. From 0288b38d76904ba09ccf1cb2ce47bba7d0c5f629 Mon Sep 17 00:00:00 2001 From: joccau Date: Mon, 28 Nov 2022 21:20:46 +0800 Subject: [PATCH 3/3] add test case Signed-off-by: joccau --- br/pkg/task/restore_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/br/pkg/task/restore_test.go b/br/pkg/task/restore_test.go index 94bbcb3c3692c..b13ecf0eccc08 100644 --- a/br/pkg/task/restore_test.go +++ b/br/pkg/task/restore_test.go @@ -63,6 +63,16 @@ func TestConfigureRestoreClient(t *testing.T) { require.True(t, client.IsOnline()) } +func TestAdjustRestoreConfigForStreamRestore(t *testing.T) { + restoreCfg := RestoreConfig{} + + restoreCfg.adjustRestoreConfigForStreamRestore() + require.Equal(t, restoreCfg.PitrBatchCount, uint32(defaultPiTRBatchCount)) + require.Equal(t, restoreCfg.PitrBatchSize, uint32(defaultPiTRBatchSize)) + require.Equal(t, restoreCfg.PitrConcurrency, uint32(defaultPiTRConcurrency)) + require.Equal(t, restoreCfg.Concurrency, restoreCfg.PitrConcurrency) +} + func TestCheckRestoreDBAndTable(t *testing.T) { cases := []struct { cfgSchemas map[string]struct{}