diff --git a/br/cmd/br/backup.go b/br/cmd/br/backup.go index 925cf58c132b4..6525c85d535b9 100644 --- a/br/cmd/br/backup.go +++ b/br/cmd/br/backup.go @@ -22,10 +22,11 @@ import ( func runBackupCommand(command *cobra.Command, cmdName string) error { cfg := task.BackupConfig{Config: task.Config{LogProgress: HasLogFile()}} - if err := cfg.ParseFromFlags(command.Flags()); err != nil { + if err := cfg.ParseFromFlags(command.Flags(), false); err != nil { command.SilenceUsage = false return errors.Trace(err) } + overrideDefaultBackupConfigIfNeeded(&cfg, command) if err := metricsutil.RegisterMetricsForBR(cfg.PD, cfg.KeyspaceName); err != nil { return errors.Trace(err) @@ -211,3 +212,10 @@ func newTxnBackupCommand() *cobra.Command { task.DefineTxnBackupFlags(command) return command } + +func overrideDefaultBackupConfigIfNeeded(config *task.BackupConfig, cmd *cobra.Command) { + // override only if flag not set by user + if !cmd.Flags().Changed(task.FlagChecksum) { + config.Checksum = false + } +} diff --git a/br/cmd/br/cmd.go b/br/cmd/br/cmd.go index df0395fa1d719..afda0a6c26473 100644 --- a/br/cmd/br/cmd.go +++ b/br/cmd/br/cmd.go @@ -81,8 +81,8 @@ func timestampLogFileName() string { return filepath.Join(os.TempDir(), time.Now().Format("br.log.2006-01-02T15.04.05Z0700")) } -// AddFlags adds flags to the given cmd. -func AddFlags(cmd *cobra.Command) { +// DefineCommonFlags defines the common flags for all BR cmd operation. +func DefineCommonFlags(cmd *cobra.Command) { cmd.Version = build.Info() cmd.Flags().BoolP(flagVersion, flagVersionShort, false, "Display version information about BR") cmd.SetVersionTemplate("{{printf \"%s\" .Version}}\n") @@ -99,6 +99,8 @@ func AddFlags(cmd *cobra.Command) { "Set whether to redact sensitive info in log") cmd.PersistentFlags().String(FlagStatusAddr, "", "Set the HTTP listening address for the status report service. Set to empty string to disable") + + // defines BR task common flags, this is shared by cmd and sql(brie) task.DefineCommonFlags(cmd.PersistentFlags()) cmd.PersistentFlags().StringP(FlagSlowLogFile, "", "", diff --git a/br/cmd/br/main.go b/br/cmd/br/main.go index f745920f5bfba..cad081606a0ea 100644 --- a/br/cmd/br/main.go +++ b/br/cmd/br/main.go @@ -20,7 +20,7 @@ func main() { TraverseChildren: true, SilenceUsage: true, } - AddFlags(rootCmd) + DefineCommonFlags(rootCmd) SetDefaultContext(ctx) rootCmd.AddCommand( NewDebugCommand(), diff --git a/br/cmd/br/restore.go b/br/cmd/br/restore.go index 916ed3b703933..820bf1abf505d 100644 --- a/br/cmd/br/restore.go +++ b/br/cmd/br/restore.go @@ -25,7 +25,7 @@ import ( func runRestoreCommand(command *cobra.Command, cmdName string) error { cfg := task.RestoreConfig{Config: task.Config{LogProgress: HasLogFile()}} - if err := cfg.ParseFromFlags(command.Flags()); err != nil { + if err := cfg.ParseFromFlags(command.Flags(), false); err != nil { command.SilenceUsage = false return errors.Trace(err) } diff --git a/br/pkg/backup/schema.go b/br/pkg/backup/schema.go index bd33b29d70240..7b1640b0d30e6 100644 --- a/br/pkg/backup/schema.go +++ b/br/pkg/backup/schema.go @@ -106,7 +106,7 @@ func (ss *Schemas) BackupSchemas( } var checksum *checkpoint.ChecksumItem - var exists bool = false + var exists = false if ss.checkpointChecksum != nil && schema.tableInfo != nil { checksum, exists = ss.checkpointChecksum[schema.tableInfo.ID] } @@ -145,7 +145,7 @@ func (ss *Schemas) BackupSchemas( zap.Uint64("Crc64Xor", schema.crc64xor), zap.Uint64("TotalKvs", schema.totalKvs), zap.Uint64("TotalBytes", schema.totalBytes), - zap.Duration("calculate-take", calculateCost)) + zap.Duration("TimeTaken", calculateCost)) } } if statsHandle != nil { diff --git a/br/pkg/metautil/metafile.go b/br/pkg/metautil/metafile.go index 03cc95ca1b5de..814b5d75d194b 100644 --- a/br/pkg/metautil/metafile.go +++ b/br/pkg/metautil/metafile.go @@ -171,11 +171,6 @@ type Table struct { StatsFileIndexes []*backuppb.StatsFileIndex } -// NoChecksum checks whether the table has a calculated checksum. -func (tbl *Table) NoChecksum() bool { - return tbl.Crc64Xor == 0 && tbl.TotalKvs == 0 && tbl.TotalBytes == 0 -} - // MetaReader wraps a reader to read both old and new version of backupmeta. type MetaReader struct { storage storage.ExternalStorage @@ -240,7 +235,7 @@ func (reader *MetaReader) readDataFiles(ctx context.Context, output func(*backup } // ArchiveSize return the size of Archive data -func (*MetaReader) ArchiveSize(_ context.Context, files []*backuppb.File) uint64 { +func ArchiveSize(files []*backuppb.File) uint64 { total := uint64(0) for _, file := range files { total += file.Size_ @@ -248,6 +243,30 @@ func (*MetaReader) ArchiveSize(_ context.Context, files []*backuppb.File) uint64 return total } +type ChecksumStats struct { + Crc64Xor uint64 + TotalKvs uint64 + TotalBytes uint64 +} + +func (stats ChecksumStats) ChecksumExists() bool { + if stats.Crc64Xor == 0 && stats.TotalKvs == 0 && stats.TotalBytes == 0 { + return false + } + return true +} + +// CalculateChecksumStatsOnFiles returns the ChecksumStats for the given files +func CalculateChecksumStatsOnFiles(files []*backuppb.File) ChecksumStats { + var stats ChecksumStats + for _, file := range files { + stats.Crc64Xor ^= file.Crc64Xor + stats.TotalKvs += file.TotalKvs + stats.TotalBytes += file.TotalBytes + } + return stats +} + // ReadDDLs reads the ddls from the backupmeta. // This function is compatible with the old backupmeta. func (reader *MetaReader) ReadDDLs(ctx context.Context) ([]byte, error) { diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index bd82ee84f0f3d..08ff8d85c25ef 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -1786,7 +1786,7 @@ func (rc *Client) GoValidateChecksum( elapsed := time.Since(start) summary.CollectSuccessUnit("table checksum", 1, elapsed) }() - err := rc.execChecksum(c, tbl, kvClient, concurrency) + err := rc.execAndValidateChecksum(c, tbl, kvClient, concurrency) if err != nil { return errors.Trace(err) } @@ -1798,7 +1798,7 @@ func (rc *Client) GoValidateChecksum( return outCh } -func (rc *Client) execChecksum( +func (rc *Client) execAndValidateChecksum( ctx context.Context, tbl *CreatedTable, kvClient kv.Client, @@ -1809,13 +1809,14 @@ func (rc *Client) execChecksum( zap.String("table", tbl.OldTable.Info.Name.O), ) - if tbl.OldTable.NoChecksum() { + expectedChecksumStats := metautil.CalculateChecksumStatsOnFiles(tbl.OldTable.Files) + if !expectedChecksumStats.ChecksumExists() { logger.Warn("table has no checksum, skipping checksum") return nil } if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("Client.execChecksum", opentracing.ChildOf(span.Context())) + span1 := span.Tracer().StartSpan("Client.execAndValidateChecksum", opentracing.ChildOf(span.Context())) defer span1.Finish() ctx = opentracing.ContextWithSpan(ctx, span1) } @@ -1855,21 +1856,24 @@ func (rc *Client) execChecksum( } } } - table := tbl.OldTable - if item.Crc64xor != table.Crc64Xor || - item.TotalKvs != table.TotalKvs || - item.TotalBytes != table.TotalBytes { + checksumMatch := item.Crc64xor == expectedChecksumStats.Crc64Xor && + item.TotalKvs == expectedChecksumStats.TotalKvs && + item.TotalBytes == expectedChecksumStats.TotalBytes + failpoint.Inject("full-restore-validate-checksum", func(_ failpoint.Value) { + checksumMatch = false + }) + if !checksumMatch { logger.Error("failed in validate checksum", - zap.Uint64("origin tidb crc64", table.Crc64Xor), + zap.Uint64("expected tidb crc64", expectedChecksumStats.Crc64Xor), zap.Uint64("calculated crc64", item.Crc64xor), - zap.Uint64("origin tidb total kvs", table.TotalKvs), + zap.Uint64("expected tidb total kvs", expectedChecksumStats.TotalKvs), zap.Uint64("calculated total kvs", item.TotalKvs), - zap.Uint64("origin tidb total bytes", table.TotalBytes), + zap.Uint64("expected tidb total bytes", expectedChecksumStats.TotalBytes), zap.Uint64("calculated total bytes", item.TotalBytes), ) return errors.Annotate(berrors.ErrRestoreChecksumMismatch, "failed to validate checksum") } - logger.Info("success in validate checksum") + logger.Info("success in validating checksum") return nil } diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index 915bdb2092bd9..642a51017d6f7 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -41,7 +41,6 @@ import ( "github.com/spf13/pflag" "github.com/tikv/client-go/v2/oracle" kvutil "github.com/tikv/client-go/v2/util" - "go.uber.org/multierr" "go.uber.org/zap" ) @@ -159,7 +158,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) { } // ParseFromFlags parses the backup-related flags from the flag set. -func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { +func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet, skipCommonConfig bool) error { timeAgo, err := flags.GetDuration(flagBackupTimeago) if err != nil { return errors.Trace(err) @@ -212,9 +211,13 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { } cfg.CompressionConfig = *compressionCfg - if err = cfg.Config.ParseFromFlags(flags); err != nil { - return errors.Trace(err) + // parse common flags if needed + if !skipCommonConfig { + if err = cfg.Config.ParseFromFlags(flags); err != nil { + return errors.Trace(err) + } } + cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers) if err != nil { return errors.Trace(err) @@ -789,18 +792,15 @@ func ParseTSString(ts string, tzCheck bool) (uint64, error) { return oracle.GoTimeToTS(t1), nil } -func DefaultBackupConfig() BackupConfig { +func DefaultBackupConfig(commonConfig Config) BackupConfig { fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError) - DefineCommonFlags(fs) DefineBackupFlags(fs) cfg := BackupConfig{} - err := multierr.Combine( - cfg.ParseFromFlags(fs), - cfg.Config.ParseFromFlags(fs), - ) + err := cfg.ParseFromFlags(fs, true) if err != nil { - log.Panic("infallible operation failed.", zap.Error(err)) + log.Panic("failed to parse backup flags to config", zap.Error(err)) } + cfg.Config = commonConfig return cfg } diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index c2432bcbaf042..bad6d54681490 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -64,7 +64,7 @@ const ( flagRateLimit = "ratelimit" flagRateLimitUnit = "ratelimit-unit" flagConcurrency = "concurrency" - flagChecksum = "checksum" + FlagChecksum = "checksum" flagFilter = "filter" flagCaseSensitive = "case-sensitive" flagRemoveTiFlash = "remove-tiflash" @@ -273,7 +273,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) { flags.Uint(flagChecksumConcurrency, variable.DefChecksumTableConcurrency, "The concurrency of checksumming in one table") flags.Uint64(flagRateLimit, unlimited, "The rate limit of the task, MB/s per node") - flags.Bool(flagChecksum, true, "Run checksum at end of task") + flags.Bool(FlagChecksum, true, "Run checksum at end of task") flags.Bool(flagRemoveTiFlash, true, "Remove TiFlash replicas before backup or restore, for unsupported versions of TiFlash") @@ -318,7 +318,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) { // HiddenFlagsForStream temporary hidden flags that stream cmd not support. func HiddenFlagsForStream(flags *pflag.FlagSet) { - _ = flags.MarkHidden(flagChecksum) + _ = flags.MarkHidden(FlagChecksum) _ = flags.MarkHidden(flagLoadStats) _ = flags.MarkHidden(flagChecksumConcurrency) _ = flags.MarkHidden(flagRateLimit) @@ -506,7 +506,7 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { return errors.Trace(err) } - if cfg.Checksum, err = flags.GetBool(flagChecksum); err != nil { + if cfg.Checksum, err = flags.GetBool(FlagChecksum); err != nil { return errors.Trace(err) } if cfg.ChecksumConcurrency, err = flags.GetUint(flagChecksumConcurrency); err != nil { @@ -619,6 +619,11 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { return cfg.normalizePDURLs() } +// OverrideDefaultForBackup override common config for backup tasks +func (cfg *Config) OverrideDefaultForBackup() { + cfg.Checksum = false +} + // NewMgr creates a new mgr at the given PD address. func NewMgr(ctx context.Context, g glue.Glue, pds []string, diff --git a/br/pkg/task/common_test.go b/br/pkg/task/common_test.go index 79bd235fe1b45..1ad0114764684 100644 --- a/br/pkg/task/common_test.go +++ b/br/pkg/task/common_test.go @@ -233,8 +233,10 @@ func expectedDefaultConfig() Config { } func expectedDefaultBackupConfig() BackupConfig { + defaultConfig := expectedDefaultConfig() + defaultConfig.Checksum = false return BackupConfig{ - Config: expectedDefaultConfig(), + Config: defaultConfig, GCTTL: utils.DefaultBRGCSafePointTTL, CompressionConfig: CompressionConfig{ CompressionType: backup.CompressionType_ZSTD, @@ -274,13 +276,16 @@ func TestDefault(t *testing.T) { } func TestDefaultBackup(t *testing.T) { - def := DefaultBackupConfig() + commonConfig := DefaultConfig() + commonConfig.OverrideDefaultForBackup() + def := DefaultBackupConfig(commonConfig) defaultConfig := expectedDefaultBackupConfig() require.Equal(t, defaultConfig, def) } func TestDefaultRestore(t *testing.T) { - def := DefaultRestoreConfig() + commonConfig := DefaultConfig() + def := DefaultRestoreConfig(commonConfig) defaultConfig := expectedDefaultRestoreConfig() require.Equal(t, defaultConfig, def) } diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 325b98f6a3481..270606ae763e3 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -328,7 +328,7 @@ func (cfg *RestoreConfig) ParseStreamRestoreFlags(flags *pflag.FlagSet) error { } // ParseFromFlags parses the restore-related flags from the flag set. -func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { +func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet, skipCommonConfig bool) error { var err error cfg.NoSchema, err = flags.GetBool(flagNoSchema) if err != nil { @@ -338,10 +338,15 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } - err = cfg.Config.ParseFromFlags(flags) - if err != nil { - return errors.Trace(err) + + // parse common config if needed + if !skipCommonConfig { + err = cfg.Config.ParseFromFlags(flags) + if err != nil { + return errors.Trace(err) + } } + err = cfg.RestoreCommonConfig.ParseFromFlags(flags) if err != nil { return errors.Trace(err) @@ -621,20 +626,16 @@ func removeCheckpointDataForLogRestore(ctx context.Context, storageName string, return errors.Trace(checkpoint.RemoveCheckpointDataForLogRestore(ctx, s, taskName, clusterID)) } -func DefaultRestoreConfig() RestoreConfig { +func DefaultRestoreConfig(commonConfig Config) RestoreConfig { fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError) - DefineCommonFlags(fs) DefineRestoreFlags(fs) cfg := RestoreConfig{} - err := multierr.Combine( - cfg.ParseFromFlags(fs), - cfg.RestoreCommonConfig.ParseFromFlags(fs), - cfg.Config.ParseFromFlags(fs), - ) + err := cfg.ParseFromFlags(fs, true) if err != nil { - log.Panic("infallible failed.", zap.Error(err)) + log.Panic("failed to parse restore flags to config", zap.Error(err)) } + cfg.Config = commonConfig return cfg } @@ -786,7 +787,7 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf return errors.Annotate(berrors.ErrRestoreInvalidBackup, "contain tables but no databases") } - archiveSize := reader.ArchiveSize(ctx, files) + archiveSize := metautil.ArchiveSize(files) g.Record(summary.RestoreDataSize, archiveSize) //restore from tidb will fetch a general Size issue https://github.com/pingcap/tidb/issues/27247 g.Record("Size", archiveSize) @@ -1083,8 +1084,9 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf var finish <-chan struct{} postHandleCh := afterTableRestoredCh - // pipeline checksum - if cfg.Checksum { + // pipeline checksum only when enabled and is not incremental snapshot repair mode cuz incremental doesn't have + // enough information in backup meta to validate checksum + if cfg.Checksum && !client.IsIncremental() { postHandleCh = client.GoValidateChecksum( ctx, postHandleCh, mgr.GetStorage().GetClient(), errCh, updateCh, cfg.ChecksumConcurrency) } @@ -1099,7 +1101,7 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf finish = dropToBlackhole(ctx, postHandleCh, errCh) - // Reset speed limit. ResetSpeedLimit must be called after client.InitBackupMeta has been called. + // Reset speed limit. ResetSpeedLimit must be called after client.LoadSchemaIfNeededAndInitClient has been called. defer func() { var resetErr error // In future we may need a mechanism to set speed limit in ttl. like what we do in switchmode. TODO diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index 5b9b009853a02..b0ab7866d5f81 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -128,7 +128,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR if err != nil { return errors.Trace(err) } - archiveSize := reader.ArchiveSize(ctx, files) + archiveSize := metautil.ArchiveSize(files) g.Record(summary.RestoreDataSize, archiveSize) if len(files) == 0 { diff --git a/br/pkg/task/restore_txn.go b/br/pkg/task/restore_txn.go index 596b1d29d714e..32694588147e1 100644 --- a/br/pkg/task/restore_txn.go +++ b/br/pkg/task/restore_txn.go @@ -69,7 +69,7 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config) } files := backupMeta.Files - archiveSize := reader.ArchiveSize(ctx, files) + archiveSize := metautil.ArchiveSize(files) g.Record(summary.RestoreDataSize, archiveSize) if len(files) == 0 { diff --git a/br/tests/br_full_ddl/run.sh b/br/tests/br_full_ddl/run.sh index 370d77dca66dd..41f96e91def8a 100755 --- a/br/tests/br_full_ddl/run.sh +++ b/br/tests/br_full_ddl/run.sh @@ -107,7 +107,7 @@ echo "backup start with stats..." unset BR_LOG_TO_TERM cluster_index_before_backup=$(run_sql "show variables like '%cluster%';" | awk '{print $2}') -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --log-file $LOG --ignore-stats=false || cat $LOG +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --log-file $LOG --ignore-stats=false --checksum=true || cat $LOG checksum_count=$(cat $LOG | grep "checksum success" | wc -l | xargs) if [ "${checksum_count}" -lt "1" ];then diff --git a/br/tests/br_full_index/run.sh b/br/tests/br_full_index/run.sh index edcac1bfa2377..28f959c10b5f4 100755 --- a/br/tests/br_full_index/run.sh +++ b/br/tests/br_full_index/run.sh @@ -41,7 +41,7 @@ echo "backup start..." # Do not log to terminal unset BR_LOG_TO_TERM # do not backup stats to test whether we can restore without stats. -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ignore-stats=true --log-file $LOG || cat $LOG +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ignore-stats=true --log-file $LOG --checksum=true || cat $LOG BR_LOG_TO_TERM=1 checksum_count=$(cat $LOG | grep "checksum success" | wc -l | xargs) diff --git a/pkg/executor/brie.go b/pkg/executor/brie.go index 1e5316881819e..e0f35f433a2cc 100644 --- a/pkg/executor/brie.go +++ b/pkg/executor/brie.go @@ -282,7 +282,15 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) Key: tidbCfg.Security.ClusterSSLKey, } pds := strings.Split(tidbCfg.Path, ",") + + // build common config and override for specific task if needed cfg := task.DefaultConfig() + switch s.Kind { + case ast.BRIEKindBackup: + cfg.OverrideDefaultForBackup() + default: + } + cfg.PD = pds cfg.TLS = tlsCfg @@ -357,8 +365,7 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) switch s.Kind { case ast.BRIEKindBackup: - bcfg := task.DefaultBackupConfig() - bcfg.Config = cfg + bcfg := task.DefaultBackupConfig(cfg) e.backupCfg = &bcfg for _, opt := range s.Options { @@ -387,8 +394,7 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) } case ast.BRIEKindRestore: - rcfg := task.DefaultRestoreConfig() - rcfg.Config = cfg + rcfg := task.DefaultRestoreConfig(cfg) e.restoreCfg = &rcfg for _, opt := range s.Options { if opt.Tp == ast.BRIEOptionOnline {