Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: restore checksum shouldn't rely on backup checksum #56712

Merged
merged 13 commits into from
Nov 18, 2024
4 changes: 2 additions & 2 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (ss *Schemas) BackupSchemas(
}

var checksum *checkpoint.ChecksumItem
var exists bool = false
var exists = false
if ss.checkpointChecksum != nil && schema.tableInfo != nil {
checksum, exists = ss.checkpointChecksum[schema.tableInfo.ID]
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func (ss *Schemas) BackupSchemas(
zap.Uint64("Crc64Xor", schema.crc64xor),
zap.Uint64("TotalKvs", schema.totalKvs),
zap.Uint64("TotalBytes", schema.totalBytes),
zap.Duration("calculate-take", calculateCost))
zap.Duration("Time taken", calculateCost))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep consistency with other fields?

Suggested change
zap.Duration("Time taken", calculateCost))
zap.Duration("TimeTaken", calculateCost))

}
}
if statsHandle != nil {
Expand Down
34 changes: 28 additions & 6 deletions br/pkg/metautil/metafile.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,6 @@ type Table struct {
StatsFileIndexes []*backuppb.StatsFileIndex
}

// NoChecksum checks whether the table has a calculated checksum.
func (tbl *Table) NoChecksum() bool {
return tbl.Crc64Xor == 0 && tbl.TotalKvs == 0 && tbl.TotalBytes == 0
}

// MetaReader wraps a reader to read both old and new version of backupmeta.
type MetaReader struct {
storage storage.ExternalStorage
Expand Down Expand Up @@ -235,14 +230,41 @@ func (reader *MetaReader) readDataFiles(ctx context.Context, output func(*backup
}

// ArchiveSize return the size of Archive data
func (*MetaReader) ArchiveSize(_ context.Context, files []*backuppb.File) uint64 {
func ArchiveSize(files []*backuppb.File) uint64 {
total := uint64(0)
for _, file := range files {
total += file.Size_
}
return total
}

type ChecksumStats struct {
Crc64Xor uint64
TotalKvs uint64
TotalBytes uint64
}

func (stats *ChecksumStats) ChecksumExists() bool {
if stats == nil {
return false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems pointer receiver isn't needed here? As I cannot find where a *ChecksumStats instead of ChecksumState is needed.

Suggested change
func (stats *ChecksumStats) ChecksumExists() bool {
if stats == nil {
return false
}
func (stats ChecksumStats) ChecksumExists() bool {

if stats.Crc64Xor == 0 && stats.TotalKvs == 0 && stats.TotalBytes == 0 {
return false
}
return true
}

// CalculateChecksumStatsOnFiles returns the ChecksumStats for the given files
func CalculateChecksumStatsOnFiles(files []*backuppb.File) ChecksumStats {
var stats ChecksumStats
for _, file := range files {
stats.Crc64Xor ^= file.Crc64Xor
stats.TotalKvs += file.TotalKvs
stats.TotalBytes += file.TotalBytes
}
return stats
}

// ReadDDLs reads the ddls from the backupmeta.
// This function is compatible with the old backupmeta.
func (reader *MetaReader) ReadDDLs(ctx context.Context) ([]byte, error) {
Expand Down
32 changes: 18 additions & 14 deletions br/pkg/restore/snap_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,8 @@ func (rc *SnapClient) needLoadSchemas(backupMeta *backuppb.BackupMeta) bool {
return !(backupMeta.IsRawKv || backupMeta.IsTxnKv)
}

// InitBackupMeta loads schemas from BackupMeta to initialize RestoreClient.
func (rc *SnapClient) InitBackupMeta(
// LoadSchemaIfNeededAndInitClient loads schemas from BackupMeta to initialize RestoreClient.
func (rc *SnapClient) LoadSchemaIfNeededAndInitClient(
c context.Context,
backupMeta *backuppb.BackupMeta,
backend *backuppb.StorageBackend,
Expand Down Expand Up @@ -989,7 +989,7 @@ func (rc *SnapClient) setSpeedLimit(ctx context.Context, rateLimit uint64) error
return nil
}

func (rc *SnapClient) execChecksum(
func (rc *SnapClient) execAndValidateChecksum(
ctx context.Context,
tbl *CreatedTable,
kvClient kv.Client,
Expand All @@ -1000,13 +1000,14 @@ func (rc *SnapClient) execChecksum(
zap.String("table", tbl.OldTable.Info.Name.O),
)

if tbl.OldTable.NoChecksum() {
logger.Warn("table has no checksum, skipping checksum")
expectedChecksumStats := metautil.CalculateChecksumStatsOnFiles(tbl.OldTable.Files)
if !expectedChecksumStats.ChecksumExists() {
Copy link
Contributor Author

@Tristan1900 Tristan1900 Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we need to do the check here, if an empty table is backed up, should we check after restore if it's still empty?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the error log is misleading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should I remove the checking here at all?

logger.Error("table has no checksum, skipping checksum")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you also print the table and database name here? Also I think this can be a warning instead of an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, I think this line above adds the db and table as fields for all subsequent logging

	logger := log.L().With(
		zap.String("db", tbl.OldTable.DB.Name.O),
		zap.String("table", tbl.OldTable.Info.Name.O),
	)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit skeptical about this logic, should we just remove it? If there is an empty table restored(would there be a case?), we should still check the checksum make sure it's empty. What do you think?

return nil
}

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Client.execChecksum", opentracing.ChildOf(span.Context()))
span1 := span.Tracer().StartSpan("Client.execAndValidateChecksum", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
Expand Down Expand Up @@ -1046,21 +1047,24 @@ func (rc *SnapClient) execChecksum(
}
}
}
table := tbl.OldTable
if item.Crc64xor != table.Crc64Xor ||
item.TotalKvs != table.TotalKvs ||
item.TotalBytes != table.TotalBytes {
checksumMatch := item.Crc64xor == expectedChecksumStats.Crc64Xor &&
item.TotalKvs == expectedChecksumStats.TotalKvs &&
item.TotalBytes == expectedChecksumStats.TotalBytes
failpoint.Inject("full-restore-validate-checksum", func(_ failpoint.Value) {
checksumMatch = false
})
if !checksumMatch {
logger.Error("failed in validate checksum",
zap.Uint64("origin tidb crc64", table.Crc64Xor),
zap.Uint64("expected tidb crc64", expectedChecksumStats.Crc64Xor),
zap.Uint64("calculated crc64", item.Crc64xor),
zap.Uint64("origin tidb total kvs", table.TotalKvs),
zap.Uint64("expected tidb total kvs", expectedChecksumStats.TotalKvs),
zap.Uint64("calculated total kvs", item.TotalKvs),
zap.Uint64("origin tidb total bytes", table.TotalBytes),
zap.Uint64("expected tidb total bytes", expectedChecksumStats.TotalBytes),
zap.Uint64("calculated total bytes", item.TotalBytes),
)
return errors.Annotate(berrors.ErrRestoreChecksumMismatch, "failed to validate checksum")
}
logger.Info("success in validate checksum")
logger.Info("success in validating checksum")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you also add the table / database name to this log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (rc *SnapClient) GoValidateChecksum(
elapsed := time.Since(start)
summary.CollectSuccessUnit("table checksum", 1, elapsed)
}()
err := rc.execChecksum(c, tbl, kvClient, concurrency)
err := rc.execAndValidateChecksum(c, tbl, kvClient, concurrency)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ go_test(
],
embed = [":task"],
flaky = True,
shard_count = 39,
shard_count = 40,
deps = [
"//br/pkg/backup",
"//br/pkg/config",
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,13 @@ func DefaultBackupConfig() BackupConfig {
if err != nil {
log.Panic("infallible operation failed.", zap.Error(err))
}

// Check if the checksum flag was set by the user
if !fs.Changed("checksum") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use flagChecksum instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh right... my mistake

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems in this context, fs.Changed(anything) is always false... As the flagset is newly created at here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, can we override the default value of --checksum in DefineBackupFlags...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like having override in DefineBackupFlags is a bit weird since it's technically not define, might be a bit confusing

// If not set, disable it for backup
cfg.Checksum = false
}

return cfg
}

Expand Down
25 changes: 25 additions & 0 deletions br/pkg/task/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package task

import (
"os"
"testing"
"time"

Expand Down Expand Up @@ -222,3 +223,27 @@ func TestBackupConfigHash(t *testing.T) {
hashCheck(t, &testCfg, originalHash, true)
}
}

func TestDefaultBackupConfigDisableChecksum(t *testing.T) {
// Test the default configuration
cfg := DefaultBackupConfig()

// Check some default values
require.Equal(t, uint32(4), cfg.Concurrency)
require.Equal(t, uint32(2), cfg.ChecksumConcurrency)
require.False(t, cfg.SendCreds)
require.False(t, cfg.Checksum)

// Test with checksum flag set
os.Args = []string{"cmd", "--checksum=true"}
cfg = DefaultBackupConfig()
require.True(t, cfg.Checksum)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the DefaultBackupConfig will parse command line from os.Args... TBH I'm even not sure how this case passes...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah you are absolutely right, should revisit before making PR read for review. It did pass locally and on CI which is weird.


// Test with checksum flag explicitly set to false
os.Args = []string{"cmd", "--checksum=false"}
cfg = DefaultBackupConfig()
require.False(t, cfg.Checksum)

// Reset os.Args
os.Args = []string{"cmd"}
}
1 change: 1 addition & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
flags.Uint(flagChecksumConcurrency, variable.DefChecksumTableConcurrency, "The concurrency of checksumming in one table")

flags.Uint64(flagRateLimit, unlimited, "The rate limit of the task, MB/s per node")
// backup will override default to be false, restore will keep default to be true
flags.Bool(flagChecksum, true, "Run checksum at end of task")
flags.Bool(flagRemoveTiFlash, true,
"Remove TiFlash replicas before backup or restore, for unsupported versions of TiFlash")
Expand Down
11 changes: 6 additions & 5 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s
}

reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo)
if err = client.InitBackupMeta(c, backupMeta, u, reader, cfg.LoadStats); err != nil {
if err = client.LoadSchemaIfNeededAndInitClient(c, backupMeta, u, reader, cfg.LoadStats); err != nil {
return errors.Trace(err)
}

Expand All @@ -822,7 +822,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s
}
}

archiveSize := reader.ArchiveSize(ctx, files)
archiveSize := metautil.ArchiveSize(files)
g.Record(summary.RestoreDataSize, archiveSize)
//restore from tidb will fetch a general Size issue https://github.com/pingcap/tidb/issues/27247
g.Record("Size", archiveSize)
Expand Down Expand Up @@ -1108,8 +1108,9 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s
errCh := make(chan error, 32)
postHandleCh := afterTableRestoredCh(ctx, createdTables)

// pipeline checksum
if cfg.Checksum {
// pipeline checksum only when enabled and is not incremental snapshot repair mode cuz incremental doesn't have
// enough information in backup meta to validate checksum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't use the collection of file-level checksums to match the table-level checksum in the incremental backup. However, we use the table-level checksum in the incremental backup to match the table-level checksum in the incremental restore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you point me to the code, I thought table level checksum for incremental is not calculated during backup

skipChecksum := !cfg.Checksum || isIncrementalBackup

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get it.

if cfg.Checksum && !client.IsIncremental() {
postHandleCh = client.GoValidateChecksum(
ctx, postHandleCh, mgr.GetStorage().GetClient(), errCh, updateCh, cfg.ChecksumConcurrency)
}
Expand All @@ -1124,7 +1125,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s

finish := dropToBlackhole(ctx, postHandleCh, errCh)

// Reset speed limit. ResetSpeedLimit must be called after client.InitBackupMeta has been called.
// Reset speed limit. ResetSpeedLimit must be called after client.LoadSchemaIfNeededAndInitClient has been called.
defer func() {
var resetErr error
// In future we may need a mechanism to set speed limit in ttl. like what we do in switchmode. TODO
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/task/restore_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
return errors.Trace(err)
}
reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo)
if err = client.InitBackupMeta(c, backupMeta, u, reader, true); err != nil {
if err = client.LoadSchemaIfNeededAndInitClient(c, backupMeta, u, reader, true); err != nil {
return errors.Trace(err)
}

Expand All @@ -121,7 +121,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
if err != nil {
return errors.Trace(err)
}
archiveSize := reader.ArchiveSize(ctx, files)
archiveSize := metautil.ArchiveSize(files)
g.Record(summary.RestoreDataSize, archiveSize)

if len(files) == 0 {
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/task/restore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config)
return errors.Trace(err)
}
reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo)
if err = client.InitBackupMeta(c, backupMeta, u, reader, true); err != nil {
if err = client.LoadSchemaIfNeededAndInitClient(c, backupMeta, u, reader, true); err != nil {
return errors.Trace(err)
}

Expand All @@ -63,7 +63,7 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config)
}

files := backupMeta.Files
archiveSize := reader.ArchiveSize(ctx, files)
archiveSize := metautil.ArchiveSize(files)
g.Record(summary.RestoreDataSize, archiveSize)

if len(files) == 0 {
Expand Down
51 changes: 40 additions & 11 deletions br/tests/br_file_corruption/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,62 @@ CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)

run_sql "CREATE DATABASE $DB;"
go-ycsb load mysql -P $CUR/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --checksum=false

filename=$(find $TEST_DIR/$DB -regex ".*.sst" | head -n 1)
filename_temp=$filename"_temp"
filename_bak=$filename"_bak"
echo "corruption" > $filename_temp
cat $filename >> $filename_temp
# Replace the single file manipulation with a loop over all .sst files
for filename in $(find $TEST_DIR/$DB -name "*.sst"); do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to corrupt every sst file cuz during restore not all sst are going to be used, since backup backs up a bunch of tables but some of them are going to be filtered out during restore

filename_temp="${filename}_temp"
filename_bak="${filename}_bak"
echo "corruption" > "$filename_temp"
cat "$filename" >> "$filename_temp"
mv "$filename" "$filename_bak"
done

# need to drop db otherwise restore will fail because of cluster not fresh but not the expected issue
Copy link
Contributor Author

@Tristan1900 Tristan1900 Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous br_file_corruption is not testing what it should be testing, restore fails because of cluster not fresh but not because of corruption.

run_sql "DROP DATABASE IF EXISTS $DB;"

# file lost
mv $filename $filename_bak
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/utils/set-import-attempt-to-one=return(true)"
restore_fail=0
run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$DB" || restore_fail=1
export GO_FAILPOINTS=""
if [ $restore_fail -ne 1 ]; then
echo 'restore success'
echo 'expect restore to fail on file lost but succeed'
exit 1
fi
run_sql "DROP DATABASE IF EXISTS $DB;"

# file corruption
mv $filename_temp $filename
truncate --size=-11 $filename
for filename in $(find $TEST_DIR/$DB -name "*.sst_temp"); do
mv "$filename" "${filename%_temp}"
truncate -s 11 "${filename%_temp}"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--size=-11 doesn't work on macOS

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
truncate -s 11 "${filename%_temp}"
truncate -s -11 "${filename%_temp}"

This comment was marked as resolved.

done

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/utils/set-import-attempt-to-one=return(true)"
restore_fail=0
run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$DB" || restore_fail=1
export GO_FAILPOINTS=""
if [ $restore_fail -ne 1 ]; then
echo 'restore success'
echo 'expect restore to fail on file corruption but succeed'
exit 1
fi
run_sql "DROP DATABASE IF EXISTS $DB;"

# verify validating checksum is still performed even backup didn't enable it
for filename in $(find $TEST_DIR/$DB -name "*.sst_bak"); do
mv "$filename" "${filename%_bak}"
done

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/snap_client/full-restore-validate-checksum=return(true)"
restore_fail=0
run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$DB" --checksum=true || restore_fail=1
export GO_FAILPOINTS=""
if [ $restore_fail -ne 1 ]; then
echo 'expect restore to fail on checksum mismatch but succeed'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I guess if we have a corrupted SST file, the restoration fails though with --checksum=false... Perhaps a better (also harder) way is to delete an file entry in the backupmeta.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh it's not testing corrupted SST file in this test, the valid files are moved back after the previous corruption test, This test is just to verify the checksum process is running even backup disables the checksum, by injecting the fail point into the checksum routine. Right after this test there is a sanity test that can restore successfully, verifying all files are valid.

exit 1
fi
run_sql "DROP DATABASE IF EXISTS $DB;"

# sanity check restore can succeed
run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$DB" --checksum=true
echo 'file corruption tests passed'