diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index 3fd3aeb8adfed..4eb1762ebae87 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -197,10 +197,13 @@ func (push *pushDown) pushBackup( if len(errMsg) <= 0 { errMsg = errPb.Msg } +<<<<<<< HEAD return res, errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s %s", +======= + return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s", +>>>>>>> d16f4c0ed0 (pitr: prevent from restore point to cluster running log backup (#40871)) store.GetId(), redact.String(store.GetAddress()), - req.StorageBackend.String(), errMsg, ) } diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index db90d11db1d34..24da433058944 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -1698,6 +1698,7 @@ func (rc *Client) RestoreKVFiles( deleteFiles = append(deleteFiles, file) continue } +<<<<<<< HEAD fileReplica := file applyFunc(fileReplica) } @@ -1708,11 +1709,22 @@ func (rc *Client) RestoreKVFiles( fileReplica := file applyFunc(fileReplica) } +======= + return errors.Trace(err) + }) + + if err = eg.Wait(); err != nil { + summary.CollectFailureUnit("file", err) + log.Error("restore files failed", zap.Error(err)) + } + +>>>>>>> d16f4c0ed0 (pitr: prevent from restore point to cluster running log backup (#40871)) log.Info("total skip files due to table id not matched", zap.Int("count", skipFile)) if skipFile > 0 { log.Debug("table id in full backup storage", zap.Any("tables", rules)) } +<<<<<<< HEAD if err = eg.Wait(); err != nil { summary.CollectFailureUnit("file", err) log.Error( @@ -1722,6 +1734,9 @@ func (rc *Client) RestoreKVFiles( return errors.Trace(err) } return nil +======= + return errors.Trace(err) +>>>>>>> d16f4c0ed0 (pitr: prevent from restore point to cluster running log backup (#40871)) } func (rc *Client) CleanUpKVFiles( diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 244af431a4c54..94d8bf4998bf0 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -339,12 +339,29 @@ func IsStreamRestore(cmdName string) bool { // RunRestore starts a restore task inside the current goroutine. func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { +<<<<<<< HEAD +======= + if err := checkTaskExists(c, cfg); err != nil { + return errors.Annotate(err, "failed to check task exits") + } + + config.UpdateGlobal(func(conf *config.Config) { + conf.KeyspaceName = cfg.KeyspaceName + }) +>>>>>>> d16f4c0ed0 (pitr: prevent from restore point to cluster running log backup (#40871)) if IsStreamRestore(cmdName) { cfg.adjustRestoreConfigForStreamRestore() return RunStreamRestore(c, g, cmdName, cfg) } + return runRestore(c, g, cmdName, cfg) +} +<<<<<<< HEAD cfg.adjustRestoreConfig() +======= +func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { + cfg.Adjust() +>>>>>>> d16f4c0ed0 (pitr: prevent from restore point to cluster running log backup (#40871)) defer summary.Summary(cmdName) ctx, cancel := context.WithCancel(c) defer cancel() diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index bd85db47da680..c04b5b0f4f067 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -774,8 +774,38 @@ func RunStreamResume( return nil } +<<<<<<< HEAD func checkConfigForStatus(cfg *StreamConfig) error { if len(cfg.PD) == 0 { +======= +func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) error { + ctx, cancel := context.WithCancel(c) + defer cancel() + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), + cfg.CheckRequirements, false, conn.StreamVersionChecker) + if err != nil { + return err + } + + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return err + } + env := streamhelper.CliEnv(mgr.StoreManager, etcdCLI) + advancer := streamhelper.NewCheckpointAdvancer(env) + advancer.UpdateConfig(cfg.AdvancerCfg) + advancerd := daemon.New(advancer, streamhelper.OwnerManagerForLogBackup(ctx, etcdCLI), cfg.AdvancerCfg.TickDuration) + loop, err := advancerd.Begin(ctx) + if err != nil { + return err + } + loop() + return nil +} + +func checkConfigForStatus(pd []string) error { + if len(pd) == 0 { +>>>>>>> d16f4c0ed0 (pitr: prevent from restore point to cluster running log backup (#40871)) return errors.Annotatef(berrors.ErrInvalidArgument, "the command needs access to PD, please specify `-u` or `--pd`") } @@ -825,7 +855,7 @@ func RunStreamStatus( ctx = opentracing.ContextWithSpan(ctx, span1) } - if err := checkConfigForStatus(cfg); err != nil { + if err := checkConfigForStatus(cfg.PD); err != nil { return err } ctl, err := makeStatusController(ctx, cfg, g) @@ -926,6 +956,32 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre return nil } +// checkTaskExists checks whether there is a log backup task running. +// If so, return an error. +func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error { + if err := checkConfigForStatus(cfg.PD); err != nil { + return err + } + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return err + } + cli := streamhelper.NewMetaDataClient(etcdCLI) + defer func() { + if err := cli.Close(); err != nil { + log.Error("failed to close the etcd client", zap.Error(err)) + } + }() + tasks, err := cli.GetAllTasks(ctx) + if err != nil { + return err + } + if len(tasks) > 0 { + return errors.Errorf("log backup task is running: %s, please stop the task before restore, and after PITR operation finished, create log-backup task again and create a full backup on this cluster", tasks[0].Info.Name) + } + return nil +} + // RunStreamRestore restores stream log. func RunStreamRestore( c context.Context, @@ -974,8 +1030,12 @@ func RunStreamRestore( logStorage := cfg.Config.Storage cfg.Config.Storage = cfg.FullBackupStorage // TiFlash replica is restored to down-stream on 'pitr' currently. +<<<<<<< HEAD cfg.skipTiflash = true if err = RunRestore(ctx, g, FullRestoreCmd, cfg); err != nil { +======= + if err = runRestore(ctx, g, FullRestoreCmd, cfg); err != nil { +>>>>>>> d16f4c0ed0 (pitr: prevent from restore point to cluster running log backup (#40871)) return errors.Trace(err) } cfg.Config.Storage = logStorage diff --git a/br/tests/br_restore_log_task_enable/run.sh b/br/tests/br_restore_log_task_enable/run.sh new file mode 100644 index 0000000000000..923f8fe7c2b33 --- /dev/null +++ b/br/tests/br_restore_log_task_enable/run.sh @@ -0,0 +1,56 @@ +#!/bin/sh +# +# Copyright 2022 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux +DB="$TEST_NAME" +TABLE="usertable" + +# start log task +run_br log start --task-name 1234 -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR + +run_sql "CREATE DATABASE $DB;" +run_sql "CREATE TABLE $DB.$TABLE (id int);" +run_sql "INSERT INTO $DB.$TABLE VALUES (1), (2), (3);" + +# backup full +run_br backup full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR + +# clean db +run_sql "DROP DATABASE $DB;" + +# restore full (should be failed) +run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR && exit 1 + +# restore point (should be failed) +run_br restore point -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR && exit 1 + +# pause log task +run_br log pause --task-name 1234 --pd $PD_ADDR + +# restore full (should be failed) +run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR && exit 1 + +# restore point (should be failed) +run_br restore point -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR && exit 1 + +# stop log task +run_br log stop --task-name 1234 --pd $PD_ADDR + +# restore full (should be success) +run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR + +# clean db +run_sql "DROP DATABASE $DB"