Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: make table existence check unified on different br client (#58211) #58263

Open
wants to merge 1 commit into
base: release-6.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,190 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return nil
}

<<<<<<< HEAD
=======
func getMaxReplica(ctx context.Context, mgr *conn.Mgr) (cnt uint64, err error) {
var resp map[string]any
err = utils.WithRetry(ctx, func() error {
resp, err = mgr.GetPDHTTPClient().GetReplicateConfig(ctx)
return err
}, utils.NewAggressivePDBackoffStrategy())
if err != nil {
return 0, errors.Trace(err)
}

key := "max-replicas"
val, ok := resp[key]
if !ok {
return 0, errors.Errorf("key %s not found in response %v", key, resp)
}
return uint64(val.(float64)), nil
}

func getStores(ctx context.Context, mgr *conn.Mgr) (stores *http.StoresInfo, err error) {
err = utils.WithRetry(ctx, func() error {
stores, err = mgr.GetPDHTTPClient().GetStores(ctx)
return err
}, utils.NewAggressivePDBackoffStrategy())
if err != nil {
return nil, errors.Trace(err)
}
return stores, nil
}

func EstimateTikvUsage(files []*backuppb.File, replicaCnt uint64, storeCnt uint64) uint64 {
if storeCnt == 0 {
return 0
}
if replicaCnt > storeCnt {
replicaCnt = storeCnt
}
totalSize := uint64(0)
for _, file := range files {
totalSize += file.GetSize_()
}
log.Info("estimate tikv usage", zap.Uint64("total size", totalSize), zap.Uint64("replicaCnt", replicaCnt), zap.Uint64("store count", storeCnt))
return totalSize * replicaCnt / storeCnt
}

func EstimateTiflashUsage(tables []*metautil.Table, storeCnt uint64) uint64 {
if storeCnt == 0 {
return 0
}
tiflashTotal := uint64(0)
for _, table := range tables {
if table.Info.TiFlashReplica == nil || table.Info.TiFlashReplica.Count <= 0 {
continue
}
tableBytes := uint64(0)
for _, file := range table.Files {
tableBytes += file.GetSize_()
}
tiflashTotal += tableBytes * table.Info.TiFlashReplica.Count
}
log.Info("estimate tiflash usage", zap.Uint64("total size", tiflashTotal), zap.Uint64("store count", storeCnt))
return tiflashTotal / storeCnt
}

func CheckStoreSpace(necessary uint64, store *http.StoreInfo) error {
available, err := units.RAMInBytes(store.Status.Available)
if err != nil {
return errors.Annotatef(berrors.ErrPDInvalidResponse, "store %d has invalid available space %s", store.Store.ID, store.Status.Available)
}
if available <= 0 {
return errors.Annotatef(berrors.ErrPDInvalidResponse, "store %d has invalid available space %s", store.Store.ID, store.Status.Available)
}
if uint64(available) < necessary {
return errors.Annotatef(berrors.ErrKVDiskFull, "store %d has no space left on device, available %s, necessary %s",
store.Store.ID, units.BytesSize(float64(available)), units.BytesSize(float64(necessary)))
}
return nil
}

func checkDiskSpace(ctx context.Context, mgr *conn.Mgr, files []*backuppb.File, tables []*metautil.Table) error {
maxReplica, err := getMaxReplica(ctx, mgr)
if err != nil {
return errors.Trace(err)
}
stores, err := getStores(ctx, mgr)
if err != nil {
return errors.Trace(err)
}

var tikvCnt, tiflashCnt uint64 = 0, 0
for i := range stores.Stores {
store := &stores.Stores[i]
if engine.IsTiFlashHTTPResp(&store.Store) {
tiflashCnt += 1
continue
}
tikvCnt += 1
}

// We won't need to restore more than 1800 PB data at one time, right?
preserve := func(base uint64, ratio float32) uint64 {
if base > 1000*units.PB {
return base
}
return base * uint64(ratio*10) / 10
}

// The preserve rate for tikv is quite accurate, while rate for tiflash is a
// number calculated from tpcc testing with variable data sizes. 1.4 is a
// relative conservative value.
tikvUsage := preserve(EstimateTikvUsage(files, maxReplica, tikvCnt), 1.1)
tiflashUsage := preserve(EstimateTiflashUsage(tables, tiflashCnt), 1.4)
log.Info("preserved disk space", zap.Uint64("tikv", tikvUsage), zap.Uint64("tiflash", tiflashUsage))

err = utils.WithRetry(ctx, func() error {
stores, err = getStores(ctx, mgr)
if err != nil {
return errors.Trace(err)
}
for _, store := range stores.Stores {
if engine.IsTiFlashHTTPResp(&store.Store) {
if err := CheckStoreSpace(tiflashUsage, &store); err != nil {
return errors.Trace(err)
}
continue
}
if err := CheckStoreSpace(tikvUsage, &store); err != nil {
return errors.Trace(err)
}
}
return nil
}, utils.NewDiskCheckBackoffStrategy())
if err != nil {
return errors.Trace(err)
}
return nil
}

// Exhaust drains all remaining errors in the channel, into a slice of errors.
func Exhaust(ec <-chan error) []error {
out := make([]error, 0, len(ec))
for {
select {
case err := <-ec:
out = append(out, err)
default:
// errCh will NEVER be closed(ya see, it has multi sender-part),
// so we just consume the current backlog of this channel, then return.
return out
}
}
}

func checkTableExistence(ctx context.Context, mgr *conn.Mgr, tables []*metautil.Table, g glue.Glue) error {
message := "table already exists: "
allUnique := true
for _, table := range tables {
_, err := mgr.GetDomain().InfoSchema().TableByName(ctx, table.DB.Name, table.Info.Name)
if err == nil {
message += fmt.Sprintf("%s.%s ", table.DB.Name, table.Info.Name)
allUnique = false
} else if !infoschema.ErrTableNotExists.Equal(err) {
return errors.Trace(err)
}
}
if !allUnique {
return errors.Annotate(berrors.ErrTablesAlreadyExisted, message)
}
return nil
}

// EstimateRangeSize estimates the total range count by file.
func EstimateRangeSize(files []*backuppb.File) int {
result := 0
for _, f := range files {
if strings.HasSuffix(f.GetName(), "_write.sst") {
result++
}
}
return result
}

>>>>>>> c1083de17df (br: make table existence check unified on different br client (#58211))
// dropToBlackhole drop all incoming tables into black hole,
// i.e. don't execute checksum, just increase the process anyhow.
func dropToBlackhole(
Expand Down
10 changes: 5 additions & 5 deletions br/tests/br_300_small_tables/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,15 @@ else
fi

# truncate every table
# (FIXME: drop instead of truncate. if we drop then create-table will still be executed and wastes time executing DDLs)
i=1
while [ $i -le $TABLES_COUNT ]; do
run_sql "truncate $DB.sbtest$i;"
run_sql "drop table $DB.sbtest$i;"
i=$(($i+1))
done

rm -rf $RESTORE_LOG
echo "restore 1/300 of the table start..."
run_br restore table --db $DB --table "sbtest100" --log-file $RESTORE_LOG -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --no-schema
run_br restore table --db $DB --table "sbtest100" --log-file $RESTORE_LOG -s "local://$TEST_DIR/$DB" --pd $PD_ADDR
restore_size=`grep "restore-data-size" "${RESTORE_LOG}" | grep -oP '\[\K[^\]]+' | grep "restore-data-size" | awk -F '=' '{print $2}' | grep -oP '\d*\.?\d+'`
echo "restore data size is ${restore_size}"

Expand All @@ -98,9 +97,10 @@ else
exit 1
fi

run_sql "drop table $DB.sbtest100;"

# restore db
# (FIXME: shouldn't need --no-schema to be fast, currently the alter-auto-id DDL slows things down)
echo "restore start..."
run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --no-schema
run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR

run_sql "DROP DATABASE $DB;"
68 changes: 68 additions & 0 deletions br/tests/br_check_dup_table/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/bin/sh
#
# Copyright 2024 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu
DB="$TEST_NAME"

run_sql "CREATE DATABASE $DB;"

run_sql "CREATE TABLE $DB.usertable1 ( \
YCSB_KEY varchar(64) NOT NULL, \
FIELD0 varchar(1) DEFAULT NULL, \
PRIMARY KEY (YCSB_KEY) \
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;"

run_sql "INSERT INTO $DB.usertable1 VALUES (\"a\", \"b\");"
run_sql "INSERT INTO $DB.usertable1 VALUES (\"aa\", \"b\");"

run_sql "CREATE TABLE $DB.usertable2 ( \
YCSB_KEY varchar(64) NOT NULL, \
FIELD0 varchar(1) DEFAULT NULL, \
PRIMARY KEY (YCSB_KEY) \
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;"

run_sql "INSERT INTO $DB.usertable2 VALUES (\"c\", \"d\");"
# backup db
echo "backup start..."
run_br --pd $PD_ADDR backup db --db "$DB" -s "local://$TEST_DIR/$DB"

run_sql "DROP DATABASE $DB;"

# restore db
echo "restore start..."
run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR

table_count=$(run_sql "use $DB; show tables;" | grep "Tables_in" | wc -l)
if [ "$table_count" -ne "2" ];then
echo "TEST: [$TEST_NAME] failed!"
exit 1
fi

# restore db again
echo "restore start..."
LOG_OUTPUT=$(run_br restore db --db "$DB" -s "local://$TEST_DIR/$DB" --pd "$PD_ADDR" 2>&1 || true)

# Check if the log contains 'ErrTableAlreadyExisted'
if ! echo "$LOG_OUTPUT" | grep -q "BR:Restore:ErrTablesAlreadyExisted"; then
echo "Error: 'ErrTableAlreadyExisted' not found in logs."
echo "Log output:"
echo "$LOG_OUTPUT"
exit 1
else
echo "restore failed as expect"
fi

run_sql "DROP DATABASE $DB;"
4 changes: 2 additions & 2 deletions br/tests/br_incompatible_tidb_config/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ run_br --pd $PD_ADDR backup db --db "$DB" -s "local://$TEST_DIR/$DB$INCREMENTAL_
run_sql "drop schema $DB;"
# restore with ddl(create table) job one by one
run_br --pd $PD_ADDR restore db --db "$DB" -s "local://$TEST_DIR/$DB$TABLE" --ddl-batch-size=1

run_sql "drop schema $DB;"
run_br --pd $PD_ADDR restore db --db "$DB" -s "local://$TEST_DIR/$DB$INCREMENTAL_TABLE" --ddl-batch-size=1

# restore
run_sql "drop schema $DB;"
# restore with batch create table
run_br --pd $PD_ADDR restore db --db "$DB" -s "local://$TEST_DIR/$DB$TABLE" --ddl-batch-size=128

run_sql "drop schema $DB;"
run_br --pd $PD_ADDR restore db --db "$DB" -s "local://$TEST_DIR/$DB$INCREMENTAL_TABLE" --ddl-batch-size=128

run_sql "drop schema $DB;"
Expand Down
74 changes: 74 additions & 0 deletions br/tests/run_group_br_tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/usr/bin/env bash

# This script split the integration tests into 9 groups to support parallel group tests execution.
# all the integration tests are located in br/tests directory. only the directories
# containing run.sh will be considered as valid br integration tests. the script will print the total case number

set -eo pipefail

# Step 1
CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
group=$1
export COV_DIR="/tmp/group_cover"
rm -rf $COV_DIR
mkdir -p $COV_DIR

# Define groups
# Note: If new group is added, the group name must also be added to CI
# * https://github.com/PingCAP-QE/ci/blob/main/pipelines/pingcap/tidb/latest/pull_br_integration_test.groovy
# Each group of tests consumes as much time as possible, thus reducing CI waiting time.
# Putting multiple light tests together and heavy tests in a separate group.
declare -A groups
groups=(
["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable br_history br_gcs br_rawkv br_tidb_placement_policy"
["G01"]="br_autoid br_crypter2 br_db br_check_dup_table br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full br_table_partition br_full_ddl br_tiflash"
["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint br_pitr_gc_safepoint br_other br_pitr_long_running_schema_loading"
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index'
["G04"]='br_range br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table '
["G05"]='br_skip_checksum br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter br_partition_add_index'
["G06"]='br_tikv_outage br_tikv_outage3 br_restore_checkpoint br_encryption'
["G07"]='br_pitr'
["G08"]='br_tikv_outage2 br_ttl br_views_and_sequences br_z_gc_safepoint br_autorandom br_file_corruption br_tiflash_conflict'
)

# Get other cases not in groups, to avoid missing any case
others=()
for script in "$CUR"/*/run.sh; do
test_name="$(basename "$(dirname "$script")")"
if [[ $test_name != br* ]]; then
continue
fi
# shellcheck disable=SC2076
if [[ ! " ${groups[*]} " =~ " ${test_name} " ]]; then
others=("${others[@]} ${test_name}")
fi
done

# enable local encryption for all tests
ENABLE_ENCRYPTION=true
export ENABLE_ENCRYPTION

if [[ "$group" == "others" ]]; then
if [[ -z $others ]]; then
echo "All br integration test cases have been added to groups"
exit 0
fi
echo "Error: "$others" is not added to any group in br/tests/run_group_br_tests.sh"
exit 1
elif [[ " ${!groups[*]} " =~ " ${group} " ]]; then
test_names="${groups[${group}]}"
# Run test cases
if [[ -n $test_names ]]; then
echo ""
echo "Run cases: ${test_names}"
for case_name in $test_names; do
echo "Run cases: ${case_name}"
rm -rf /tmp/backup_restore_test
mkdir -p /tmp/backup_restore_test
TEST_NAME=${case_name} ${CUR}/run.sh
done
fi
else
echo "Error: invalid group name: ${group}"
exit 1
fi
Loading