diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index 8f5c18b3fc156..a1b1b7690bcb1 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -45,7 +45,6 @@ go_library( "//br/pkg/utils", "//br/pkg/version", "//pkg/config", - "//pkg/ddl", "//pkg/kv", "//pkg/parser/model", "//pkg/parser/mysql", diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 8703dae12e759..e1939564400d9 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -50,7 +50,6 @@ import ( "github.com/pingcap/tidb/br/pkg/streamhelper/daemon" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" - "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/util/cdcutil" @@ -469,10 +468,6 @@ func (s *streamMgr) checkStreamStartEnable(g glue.Glue) error { return errors.New("Unable to create task about log-backup. " + "please set TiKV config `log-backup.enable` to true and restart TiKVs.") } - if !ddl.IngestJobsNotExisted(se.GetSessionCtx()) { - return errors.Annotate(berrors.ErrUnknown, - "Unable to create log backup task. Please wait until the DDL jobs(add index with ingest method) are finished.") - } return nil } diff --git a/br/tests/br_pitr_failpoint/check/check_ingest_repair.sh b/br/tests/br_pitr_failpoint/check/check_ingest_repair.sh new file mode 100644 index 0000000000000..d0ebcd97d42c8 --- /dev/null +++ b/br/tests/br_pitr_failpoint/check/check_ingest_repair.sh @@ -0,0 +1,28 @@ +#!/bin/bash +# +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu + +# check index schema +## check table test.pairs +run_sql "SHOW INDEX FROM test.pairs WHERE Key_name = 'i1';" +check_contains "Column_name: y" +check_contains "Column_name: z" + +# check index data +run_sql "select count(*) AS RESCNT from test.pairs use index(i1) where y = 0 and z = 0;" +check_not_contains "RESCNT: 0" +run_sql "admin check table test.pairs;" diff --git a/br/tests/br_pitr_failpoint/intersect_data/ingest_repair1.sql b/br/tests/br_pitr_failpoint/intersect_data/ingest_repair1.sql new file mode 100644 index 0000000000000..fe37db948171a --- /dev/null +++ b/br/tests/br_pitr_failpoint/intersect_data/ingest_repair1.sql @@ -0,0 +1 @@ +ALTER TABLE test.pairs ADD INDEX i1(y, z); diff --git a/br/tests/br_pitr_failpoint/prepare_data/ingest_repair.sql b/br/tests/br_pitr_failpoint/prepare_data/ingest_repair.sql new file mode 100644 index 0000000000000..1f34c86cb8a41 --- /dev/null +++ b/br/tests/br_pitr_failpoint/prepare_data/ingest_repair.sql @@ -0,0 +1,3 @@ +CREATE TABLE test.pairs(x int auto_increment primary key, y int DEFAULT RAND(), z int DEFAULT RAND()); +INSERT INTO test.pairs (y,z) VALUES (0,0); +INSERT INTO test.pairs VALUES (),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(); diff --git a/br/tests/br_pitr_failpoint/run.sh b/br/tests/br_pitr_failpoint/run.sh new file mode 100644 index 0000000000000..8ce6d4b49b526 --- /dev/null +++ b/br/tests/br_pitr_failpoint/run.sh @@ -0,0 +1,183 @@ +#!/bin/bash +# +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +. run_services +CUR=$(cd `dirname $0`; pwd) + +# const value +PREFIX="pitr_backup_failpoint" # NOTICE: don't start with 'br' because `restart services` would remove file/directory br*. +res_file="$TEST_DIR/sql_res.$TEST_NAME.txt" +hint_sig_file_public=$TEST_DIR/hint_sig_file_public +hint_sig_file_history=$TEST_DIR/hint_sig_file_history + +# inject some failpoints for TiDB-server +export GO_FAILPOINTS="github.com/pingcap/tidb/pkg/ddl/create-index-stuck-before-public=return(\"$hint_sig_file_public\");\ +github.com/pingcap/tidb/pkg/ddl/create-index-stuck-before-ddlhistory=return(\"$hint_sig_file_history\")" + +# start a new cluster +echo "restart a services" +restart_services + +# prepare the data +echo "prepare the data" +run_sql_file $CUR/prepare_data/ingest_repair.sql + +# prepare the intersect data +run_sql_file $CUR/intersect_data/ingest_repair1.sql & +sql_pid=$! + +# start the log backup task +echo "start log task" +run_br --pd $PD_ADDR log start --task-name integration_test -s "local://$TEST_DIR/$PREFIX/log" + +# wait until the index creation is running +retry_cnt=0 +while true; do + run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'running' AND SCHEMA_STATE = 'write reorganization' AND JOB_TYPE = 'add index /* ingest */';" + if grep -Fq "1. row" $res_file; then + break + fi + + retry_cnt=$((retry_cnt+1)) + if [ "$retry_cnt" -gt 50 ]; then + echo 'the wait lag is too large' + exit 1 + fi + + sleep 1 +done + +# run snapshot backup 1 -- before the index becomes public +echo "run snapshot backup" +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full-1" + +# advance the progress of index creation, make the index become public +touch $hint_sig_file_public + +# wait until the index creation is done +retry_cnt=0 +while true; do + run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'done' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index /* ingest */';" + if grep -Fq "1. row" $res_file; then + break + fi + + retry_cnt=$((retry_cnt+1)) + if [ "$retry_cnt" -gt 50 ]; then + echo 'the wait lag is too large' + exit 1 + fi + + sleep 1 +done + +# run snapshot backup 2 -- before the ddl history is generated +echo "run snapshot backup" +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full-2" + +# advance the progress of index creation, generate ddl history +touch $hint_sig_file_history + +# wait index creation done +wait $sql_pid + +# wait until the index creation is done +retry_cnt=0 +while true; do + run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'synced' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index /* ingest */';" + if grep -Fq "1. row" $res_file; then + break + fi + + retry_cnt=$((retry_cnt+1)) + if [ "$retry_cnt" -gt 50 ]; then + echo 'the wait lag is too large' + exit 1 + fi + + sleep 1 +done + +# clean the failpoints +export GO_FAILPOINTS="" + +# check something in the upstream +run_sql "SHOW INDEX FROM test.pairs WHERE Key_name = 'i1';" +check_contains "Column_name: y" +check_contains "Column_name: z" + +# wait checkpoint advance +echo "wait checkpoint advance" +sleep 10 +current_ts=$(echo $(($(date +%s%3N) << 18))) +echo "current ts: $current_ts" +i=0 +while true; do + # extract the checkpoint ts of the log backup task. If there is some error, the checkpoint ts should be empty + log_backup_status=$(unset BR_LOG_TO_TERM && run_br --pd $PD_ADDR log status --task-name integration_test --json 2>/dev/null) + echo "log backup status: $log_backup_status" + checkpoint_ts=$(echo "$log_backup_status" | head -n 1 | jq 'if .[0].last_errors | length == 0 then .[0].checkpoint else empty end') + echo "checkpoint ts: $checkpoint_ts" + + # check whether the checkpoint ts is a number + if [ $checkpoint_ts -gt 0 ] 2>/dev/null; then + # check whether the checkpoint has advanced + if [ $checkpoint_ts -gt $current_ts ]; then + echo "the checkpoint has advanced" + break + fi + # the checkpoint hasn't advanced + echo "the checkpoint hasn't advanced" + i=$((i+1)) + if [ "$i" -gt 50 ]; then + echo 'the checkpoint lag is too large' + exit 1 + fi + sleep 10 + else + # unknown status, maybe somewhere is wrong + echo "TEST: [$TEST_NAME] failed to wait checkpoint advance!" + exit 1 + fi +done + +# start a new cluster +echo "restart a services" +restart_services + +# PITR restore - 1 +echo "run pitr 1 -- before the index becomes public" +run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full-1" > $res_file 2>&1 + +# check something in downstream cluster +echo "check br log" +check_contains "restore log success summary" +## check feature compatibility between PITR and accelerate indexing +bash $CUR/check/check_ingest_repair.sh + +# Clean the data +run_sql "DROP DATABASE test; CREATE DATABASE test;" + +# PITR restore - 2 +echo "run pitr 2 -- before the index becomes public" +run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full-2" > $res_file 2>&1 + +# check something in downstream cluster +echo "check br log" +check_contains "restore log success summary" +## check feature compatibility between PITR and accelerate indexing +bash $CUR/check/check_ingest_repair.sh diff --git a/br/tests/config/tikv.toml b/br/tests/config/tikv.toml index dc42772a78e46..a469b389989e7 100644 --- a/br/tests/config/tikv.toml +++ b/br/tests/config/tikv.toml @@ -33,3 +33,6 @@ data-encryption-method = "aes256-ctr" [security.encryption.master-key] type = "file" path = "/tmp/backup_restore_test/master-key-file" + +[log-backup] +max-flush-interval = "50s" diff --git a/br/tests/run_group_br_tests.sh b/br/tests/run_group_br_tests.sh index 5dda7f47ff6b1..d588c03c60b7b 100755 --- a/br/tests/run_group_br_tests.sh +++ b/br/tests/run_group_br_tests.sh @@ -23,7 +23,7 @@ groups=( ["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable" ["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full" ["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history" - ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr' + ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr br_pitr_failpoint' ["G04"]='br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index' ["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table' ["G06"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn br_stats' diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index 347ef20d3142b..309d9b21de8cc 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math/rand" + "os" "strconv" "sync" "sync/atomic" @@ -918,6 +919,21 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) { if job.IsDone() { job.State = model.JobStateSynced } + // Inject the failpoint to prevent the progress of index creation. + failpoint.Inject("create-index-stuck-before-ddlhistory", func(v failpoint.Value) { + if sigFile, ok := v.(string); ok && job.Type == model.ActionAddIndex { + for { + time.Sleep(1 * time.Second) + if _, err := os.Stat(sigFile); err != nil { + if os.IsNotExist(err) { + continue + } + failpoint.Return(0, errors.Trace(err)) + } + break + } + } + }) err = w.HandleJobDone(d, job, t) return 0, err } diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 58bf5f5b863d6..efd4d9e576c3a 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -745,6 +745,22 @@ SwitchIndexState: indexInfo.State = model.StatePublic } + // Inject the failpoint to prevent the progress of index creation. + failpoint.Inject("create-index-stuck-before-public", func(v failpoint.Value) { + if sigFile, ok := v.(string); ok { + for { + time.Sleep(1 * time.Second) + if _, err := os.Stat(sigFile); err != nil { + if os.IsNotExist(err) { + continue + } + failpoint.Return(ver, errors.Trace(err)) + } + break + } + } + }) + ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != model.StatePublic) if err != nil { return ver, errors.Trace(err) @@ -858,32 +874,6 @@ func cleanupSortPath(ctx context.Context, currentJobID int64) error { return nil } -// IngestJobsNotExisted checks the ddl about `add index` with ingest method not existed. -func IngestJobsNotExisted(ctx sessionctx.Context) bool { - se := sess.NewSession(ctx) - template := "select job_meta from mysql.tidb_ddl_job where reorg and (type = %d or type = %d) and processing;" - sql := fmt.Sprintf(template, model.ActionAddIndex, model.ActionAddPrimaryKey) - rows, err := se.Execute(context.Background(), sql, "check-pitr") - if err != nil { - logutil.BgLogger().Warn("cannot check ingest job", zap.Error(err)) - return false - } - for _, row := range rows { - jobBinary := row.GetBytes(0) - runJob := model.Job{} - err := runJob.Decode(jobBinary) - if err != nil { - logutil.BgLogger().Warn("cannot check ingest job", zap.Error(err)) - return false - } - // Check whether this add index job is using lightning to do the backfill work. - if runJob.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { - return false - } - } - return true -} - func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, allIndexInfos []*model.IndexInfo) (done bool, ver int64, err error) { if job.MultiSchemaInfo.Revertible {