Skip to content

Commit

Permalink
br: restore checksum shouldn't rely on backup checksum (pingcap#56712)
Browse files Browse the repository at this point in the history
close pingcap#56373

(cherry picked from commit 4f047be)
  • Loading branch information
Tristan1900 committed Dec 12, 2024
1 parent 65fd2ad commit c3b8485
Show file tree
Hide file tree
Showing 15 changed files with 102 additions and 55 deletions.
10 changes: 9 additions & 1 deletion br/cmd/br/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (

func runBackupCommand(command *cobra.Command, cmdName string) error {
cfg := task.BackupConfig{Config: task.Config{LogProgress: HasLogFile()}}
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
if err := cfg.ParseFromFlags(command.Flags(), false); err != nil {
command.SilenceUsage = false
return errors.Trace(err)
}
overrideDefaultBackupConfigIfNeeded(&cfg, command)

if err := metricsutil.RegisterMetricsForBR(cfg.PD, cfg.KeyspaceName); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -211,3 +212,10 @@ func newTxnBackupCommand() *cobra.Command {
task.DefineTxnBackupFlags(command)
return command
}

func overrideDefaultBackupConfigIfNeeded(config *task.BackupConfig, cmd *cobra.Command) {
// override only if flag not set by user
if !cmd.Flags().Changed(task.FlagChecksum) {
config.Checksum = false
}
}
6 changes: 4 additions & 2 deletions br/cmd/br/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func timestampLogFileName() string {
return filepath.Join(os.TempDir(), time.Now().Format("br.log.2006-01-02T15.04.05Z0700"))
}

// AddFlags adds flags to the given cmd.
func AddFlags(cmd *cobra.Command) {
// DefineCommonFlags defines the common flags for all BR cmd operation.
func DefineCommonFlags(cmd *cobra.Command) {
cmd.Version = build.Info()
cmd.Flags().BoolP(flagVersion, flagVersionShort, false, "Display version information about BR")
cmd.SetVersionTemplate("{{printf \"%s\" .Version}}\n")
Expand All @@ -99,6 +99,8 @@ func AddFlags(cmd *cobra.Command) {
"Set whether to redact sensitive info in log")
cmd.PersistentFlags().String(FlagStatusAddr, "",
"Set the HTTP listening address for the status report service. Set to empty string to disable")

// defines BR task common flags, this is shared by cmd and sql(brie)
task.DefineCommonFlags(cmd.PersistentFlags())

cmd.PersistentFlags().StringP(FlagSlowLogFile, "", "",
Expand Down
2 changes: 1 addition & 1 deletion br/cmd/br/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func main() {
TraverseChildren: true,
SilenceUsage: true,
}
AddFlags(rootCmd)
DefineCommonFlags(rootCmd)
SetDefaultContext(ctx)
rootCmd.AddCommand(
NewDebugCommand(),
Expand Down
2 changes: 1 addition & 1 deletion br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

func runRestoreCommand(command *cobra.Command, cmdName string) error {
cfg := task.RestoreConfig{Config: task.Config{LogProgress: HasLogFile()}}
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
if err := cfg.ParseFromFlags(command.Flags(), false); err != nil {
command.SilenceUsage = false
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (ss *Schemas) BackupSchemas(
}

var checksum *checkpoint.ChecksumItem
var exists bool = false
var exists = false
if ss.checkpointChecksum != nil && schema.tableInfo != nil {
checksum, exists = ss.checkpointChecksum[schema.tableInfo.ID]
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func (ss *Schemas) BackupSchemas(
zap.Uint64("Crc64Xor", schema.crc64xor),
zap.Uint64("TotalKvs", schema.totalKvs),
zap.Uint64("TotalBytes", schema.totalBytes),
zap.Duration("calculate-take", calculateCost))
zap.Duration("TimeTaken", calculateCost))
}
}
if statsHandle != nil {
Expand Down
31 changes: 25 additions & 6 deletions br/pkg/metautil/metafile.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,6 @@ type Table struct {
StatsFileIndexes []*backuppb.StatsFileIndex
}

// NoChecksum checks whether the table has a calculated checksum.
func (tbl *Table) NoChecksum() bool {
return tbl.Crc64Xor == 0 && tbl.TotalKvs == 0 && tbl.TotalBytes == 0
}

// MetaReader wraps a reader to read both old and new version of backupmeta.
type MetaReader struct {
storage storage.ExternalStorage
Expand Down Expand Up @@ -240,14 +235,38 @@ func (reader *MetaReader) readDataFiles(ctx context.Context, output func(*backup
}

// ArchiveSize return the size of Archive data
func (*MetaReader) ArchiveSize(_ context.Context, files []*backuppb.File) uint64 {
func ArchiveSize(files []*backuppb.File) uint64 {
total := uint64(0)
for _, file := range files {
total += file.Size_
}
return total
}

type ChecksumStats struct {
Crc64Xor uint64
TotalKvs uint64
TotalBytes uint64
}

func (stats ChecksumStats) ChecksumExists() bool {
if stats.Crc64Xor == 0 && stats.TotalKvs == 0 && stats.TotalBytes == 0 {
return false
}
return true
}

// CalculateChecksumStatsOnFiles returns the ChecksumStats for the given files
func CalculateChecksumStatsOnFiles(files []*backuppb.File) ChecksumStats {
var stats ChecksumStats
for _, file := range files {
stats.Crc64Xor ^= file.Crc64Xor
stats.TotalKvs += file.TotalKvs
stats.TotalBytes += file.TotalBytes
}
return stats
}

// ReadDDLs reads the ddls from the backupmeta.
// This function is compatible with the old backupmeta.
func (reader *MetaReader) ReadDDLs(ctx context.Context) ([]byte, error) {
Expand Down
22 changes: 11 additions & 11 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/spf13/pflag"
"github.com/tikv/client-go/v2/oracle"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/multierr"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -159,7 +158,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
}

// ParseFromFlags parses the backup-related flags from the flag set.
func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet, skipCommonConfig bool) error {
timeAgo, err := flags.GetDuration(flagBackupTimeago)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -212,9 +211,13 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
}
cfg.CompressionConfig = *compressionCfg

if err = cfg.Config.ParseFromFlags(flags); err != nil {
return errors.Trace(err)
// parse common flags if needed
if !skipCommonConfig {
if err = cfg.Config.ParseFromFlags(flags); err != nil {
return errors.Trace(err)
}
}

cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -789,18 +792,15 @@ func ParseTSString(ts string, tzCheck bool) (uint64, error) {
return oracle.GoTimeToTS(t1), nil
}

func DefaultBackupConfig() BackupConfig {
func DefaultBackupConfig(commonConfig Config) BackupConfig {
fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError)
DefineCommonFlags(fs)
DefineBackupFlags(fs)
cfg := BackupConfig{}
err := multierr.Combine(
cfg.ParseFromFlags(fs),
cfg.Config.ParseFromFlags(fs),
)
err := cfg.ParseFromFlags(fs, true)
if err != nil {
log.Panic("infallible operation failed.", zap.Error(err))
log.Panic("failed to parse backup flags to config", zap.Error(err))
}
cfg.Config = commonConfig
return cfg
}

Expand Down
13 changes: 9 additions & 4 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (
flagRateLimit = "ratelimit"
flagRateLimitUnit = "ratelimit-unit"
flagConcurrency = "concurrency"
flagChecksum = "checksum"
FlagChecksum = "checksum"
flagFilter = "filter"
flagCaseSensitive = "case-sensitive"
flagRemoveTiFlash = "remove-tiflash"
Expand Down Expand Up @@ -273,7 +273,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
flags.Uint(flagChecksumConcurrency, variable.DefChecksumTableConcurrency, "The concurrency of checksumming in one table")

flags.Uint64(flagRateLimit, unlimited, "The rate limit of the task, MB/s per node")
flags.Bool(flagChecksum, true, "Run checksum at end of task")
flags.Bool(FlagChecksum, true, "Run checksum at end of task")
flags.Bool(flagRemoveTiFlash, true,
"Remove TiFlash replicas before backup or restore, for unsupported versions of TiFlash")

Expand Down Expand Up @@ -318,7 +318,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) {

// HiddenFlagsForStream temporary hidden flags that stream cmd not support.
func HiddenFlagsForStream(flags *pflag.FlagSet) {
_ = flags.MarkHidden(flagChecksum)
_ = flags.MarkHidden(FlagChecksum)
_ = flags.MarkHidden(flagLoadStats)
_ = flags.MarkHidden(flagChecksumConcurrency)
_ = flags.MarkHidden(flagRateLimit)
Expand Down Expand Up @@ -506,7 +506,7 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
return errors.Trace(err)
}

if cfg.Checksum, err = flags.GetBool(flagChecksum); err != nil {
if cfg.Checksum, err = flags.GetBool(FlagChecksum); err != nil {
return errors.Trace(err)
}
if cfg.ChecksumConcurrency, err = flags.GetUint(flagChecksumConcurrency); err != nil {
Expand Down Expand Up @@ -619,6 +619,11 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
return cfg.normalizePDURLs()
}

// OverrideDefaultForBackup override common config for backup tasks
func (cfg *Config) OverrideDefaultForBackup() {
cfg.Checksum = false
}

// NewMgr creates a new mgr at the given PD address.
func NewMgr(ctx context.Context,
g glue.Glue, pds []string,
Expand Down
11 changes: 8 additions & 3 deletions br/pkg/task/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,10 @@ func expectedDefaultConfig() Config {
}

func expectedDefaultBackupConfig() BackupConfig {
defaultConfig := expectedDefaultConfig()
defaultConfig.Checksum = false
return BackupConfig{
Config: expectedDefaultConfig(),
Config: defaultConfig,
GCTTL: utils.DefaultBRGCSafePointTTL,
CompressionConfig: CompressionConfig{
CompressionType: backup.CompressionType_ZSTD,
Expand Down Expand Up @@ -274,13 +276,16 @@ func TestDefault(t *testing.T) {
}

func TestDefaultBackup(t *testing.T) {
def := DefaultBackupConfig()
commonConfig := DefaultConfig()
commonConfig.OverrideDefaultForBackup()
def := DefaultBackupConfig(commonConfig)
defaultConfig := expectedDefaultBackupConfig()
require.Equal(t, defaultConfig, def)
}

func TestDefaultRestore(t *testing.T) {
def := DefaultRestoreConfig()
commonConfig := DefaultConfig()
def := DefaultRestoreConfig(commonConfig)
defaultConfig := expectedDefaultRestoreConfig()
require.Equal(t, defaultConfig, def)
}
34 changes: 18 additions & 16 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (cfg *RestoreConfig) ParseStreamRestoreFlags(flags *pflag.FlagSet) error {
}

// ParseFromFlags parses the restore-related flags from the flag set.
func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error {
func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet, skipCommonConfig bool) error {
var err error
cfg.NoSchema, err = flags.GetBool(flagNoSchema)
if err != nil {
Expand All @@ -337,10 +337,15 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
err = cfg.Config.ParseFromFlags(flags)
if err != nil {
return errors.Trace(err)

// parse common config if needed
if !skipCommonConfig {
err = cfg.Config.ParseFromFlags(flags)
if err != nil {
return errors.Trace(err)
}
}

err = cfg.RestoreCommonConfig.ParseFromFlags(flags)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -620,20 +625,16 @@ func removeCheckpointDataForLogRestore(ctx context.Context, storageName string,
return errors.Trace(checkpoint.RemoveCheckpointDataForLogRestore(ctx, s, taskName, clusterID))
}

func DefaultRestoreConfig() RestoreConfig {
func DefaultRestoreConfig(commonConfig Config) RestoreConfig {
fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError)
DefineCommonFlags(fs)
DefineRestoreFlags(fs)
cfg := RestoreConfig{}
err := multierr.Combine(
cfg.ParseFromFlags(fs),
cfg.RestoreCommonConfig.ParseFromFlags(fs),
cfg.Config.ParseFromFlags(fs),
)
err := cfg.ParseFromFlags(fs, true)
if err != nil {
log.Panic("infallible failed.", zap.Error(err))
log.Panic("failed to parse restore flags to config", zap.Error(err))
}

cfg.Config = commonConfig
return cfg
}

Expand Down Expand Up @@ -785,7 +786,7 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return errors.Annotate(berrors.ErrRestoreInvalidBackup, "contain tables but no databases")
}

archiveSize := reader.ArchiveSize(ctx, files)
archiveSize := metautil.ArchiveSize(files)
g.Record(summary.RestoreDataSize, archiveSize)
//restore from tidb will fetch a general Size issue https://github.com/pingcap/tidb/issues/27247
g.Record("Size", archiveSize)
Expand Down Expand Up @@ -1077,8 +1078,9 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
var finish <-chan struct{}
postHandleCh := afterTableRestoredCh

// pipeline checksum
if cfg.Checksum {
// pipeline checksum only when enabled and is not incremental snapshot repair mode cuz incremental doesn't have
// enough information in backup meta to validate checksum
if cfg.Checksum && !client.IsIncremental() {
postHandleCh = client.GoValidateChecksum(
ctx, postHandleCh, mgr.GetStorage().GetClient(), errCh, updateCh, cfg.ChecksumConcurrency)
}
Expand All @@ -1093,7 +1095,7 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf

finish = dropToBlackhole(ctx, postHandleCh, errCh)

// Reset speed limit. ResetSpeedLimit must be called after client.InitBackupMeta has been called.
// Reset speed limit. ResetSpeedLimit must be called after client.LoadSchemaIfNeededAndInitClient has been called.
defer func() {
var resetErr error
// In future we may need a mechanism to set speed limit in ttl. like what we do in switchmode. TODO
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
if err != nil {
return errors.Trace(err)
}
archiveSize := reader.ArchiveSize(ctx, files)
archiveSize := metautil.ArchiveSize(files)
g.Record(summary.RestoreDataSize, archiveSize)

if len(files) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config)
}

files := backupMeta.Files
archiveSize := reader.ArchiveSize(ctx, files)
archiveSize := metautil.ArchiveSize(files)
g.Record(summary.RestoreDataSize, archiveSize)

if len(files) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion br/tests/br_full_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ echo "backup start with stats..."
unset BR_LOG_TO_TERM
cluster_index_before_backup=$(run_sql "show variables like '%cluster%';" | awk '{print $2}')

run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --log-file $LOG --ignore-stats=false || cat $LOG
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --log-file $LOG --ignore-stats=false --checksum=true || cat $LOG
checksum_count=$(cat $LOG | grep "checksum success" | wc -l | xargs)

if [ "${checksum_count}" -lt "1" ];then
Expand Down
2 changes: 1 addition & 1 deletion br/tests/br_full_index/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ echo "backup start..."
# Do not log to terminal
unset BR_LOG_TO_TERM
# do not backup stats to test whether we can restore without stats.
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ignore-stats=true --log-file $LOG || cat $LOG
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ignore-stats=true --log-file $LOG --checksum=true || cat $LOG
BR_LOG_TO_TERM=1

checksum_count=$(cat $LOG | grep "checksum success" | wc -l | xargs)
Expand Down
Loading

0 comments on commit c3b8485

Please sign in to comment.