diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index af198f4fdea58..9a3a78031b424 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -435,6 +435,16 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti } }) + failpoint.Inject("PrintStatus", func() { + defer func() { + finished, total := l.Status() + o.logger.Warn("PrintStatus Failpoint", + zap.Int64("finished", finished), + zap.Int64("total", total), + zap.Bool("equal", finished == total)) + }() + }) + if err := taskCfg.TiDB.Security.RegisterMySQL(); err != nil { return common.ErrInvalidTLSConfig.Wrap(err) } @@ -504,8 +514,6 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti dbMetas := mdl.GetDatabases() web.BroadcastInitProgress(dbMetas) - var procedure *restore.Controller - param := &restore.ControllerParam{ DBMetas: dbMetas, Status: &l.status, @@ -516,6 +524,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti CheckpointName: o.checkpointName, } + var procedure *restore.Controller procedure, err = restore.NewRestoreController(ctx, taskCfg, param) if err != nil { o.logger.Error("restore failed", log.ShortError(err)) diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 0a0e05b45ac5d..10ef56507b8ce 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -232,7 +232,15 @@ type Controller struct { precheckItemBuilder *PrecheckItemBuilder } +// LightningStatus provides the finished bytes and total bytes of the current task. +// It should keep the value after restart from checkpoint. +// When it is tidb backend, FinishedFileSize can be counted after chunk data is +// restored to tidb. When it is local backend it's counted after whole engine is +// imported. +// TotalFileSize may be an estimated value, so when the task is finished, it may +// not equal to FinishedFileSize. type LightningStatus struct { + backend string FinishedFileSize atomic.Int64 TotalFileSize atomic.Int64 } @@ -353,6 +361,7 @@ func NewRestoreControllerWithPauser( default: return nil, common.ErrUnknownBackend.GenWithStackByArgs(cfg.TikvImporter.Backend) } + p.Status.backend = cfg.TikvImporter.Backend var metaBuilder metaMgrBuilder isSSTImport := cfg.TikvImporter.Backend == config.BackendLocal @@ -2427,8 +2436,13 @@ func (cr *chunkRestore) deliverLoop( // comes from chunk.Chunk.Offset. so it shouldn't happen that currOffset - startOffset < 0. // but we met it one time, but cannot reproduce it now, we add this check to make code more robust // TODO: reproduce and find the root cause and fix it completely - if currOffset >= startOffset { - m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(currOffset - startOffset)) + + delta := currOffset - startOffset + if delta >= 0 { + m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(delta)) + if rc.status != nil && rc.status.backend == config.BackendTiDB { + rc.status.FinishedFileSize.Add(delta) + } } else { deliverLogger.Warn("offset go back", zap.Int64("curr", currOffset), zap.Int64("start", startOffset)) @@ -2441,6 +2455,11 @@ func (cr *chunkRestore) deliverLoop( } failpoint.Inject("SlowDownWriteRows", func() { deliverLogger.Warn("Slowed down write rows") + finished := rc.status.FinishedFileSize.Load() + total := rc.status.TotalFileSize.Load() + deliverLogger.Warn("PrintStatus Failpoint", + zap.Int64("finished", finished), + zap.Int64("total", total)) }) failpoint.Inject("FailAfterWriteRows", nil) // TODO: for local backend, we may save checkpoint more frequently, e.g. after written diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 11038d62195ea..ad975bc2f6bad 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -327,7 +327,7 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp dataWorker := rc.closedEngineLimit.Apply() defer rc.closedEngineLimit.Recycle(dataWorker) err = tr.importEngine(ctx, dataClosedEngine, rc, eid, ecp) - if rc.status != nil { + if rc.status != nil && rc.status.backend == config.BackendLocal { for _, chunk := range ecp.Chunks { rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset) } @@ -406,6 +406,11 @@ func (tr *TableRestore) restoreEngine( if err != nil { return closedEngine, errors.Trace(err) } + if rc.status != nil && rc.status.backend == config.BackendTiDB { + for _, chunk := range cp.Chunks { + rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset) + } + } return closedEngine, nil } @@ -475,6 +480,9 @@ func (tr *TableRestore) restoreEngine( // Restore table data for chunkIndex, chunk := range cp.Chunks { + if rc.status != nil && rc.status.backend == config.BackendTiDB { + rc.status.FinishedFileSize.Add(chunk.Chunk.Offset - chunk.Key.Offset) + } if chunk.Chunk.Offset >= chunk.Chunk.EndOffset { continue } diff --git a/br/tests/lightning_checkpoint_columns/run.sh b/br/tests/lightning_checkpoint_columns/run.sh index 401c75cfb9f64..5809d05a1b830 100755 --- a/br/tests/lightning_checkpoint_columns/run.sh +++ b/br/tests/lightning_checkpoint_columns/run.sh @@ -29,6 +29,8 @@ echo "INSERT INTO tbl (j, i) VALUES (3, 1),(4, 2);" > "$DBPATH/cp_tsr.tbl.sql" # Set the failpoint to kill the lightning instance as soon as one row is written PKG="github.com/pingcap/tidb/br/pkg/lightning/restore" export GO_FAILPOINTS="$PKG/SlowDownWriteRows=sleep(1000);$PKG/FailAfterWriteRows=panic;$PKG/SetMinDeliverBytes=return(1)" +# Check after 1 row is written in tidb backend, the finished progress is updated +export GO_FAILPOINTS="${GO_FAILPOINTS};github.com/pingcap/tidb/br/pkg/lightning/PrintStatus=return()" # Start importing the tables. run_sql 'DROP DATABASE IF EXISTS cp_tsr' @@ -40,11 +42,16 @@ set -e run_sql 'SELECT count(*) FROM `cp_tsr`.tbl' check_contains "count(*): 1" +# After FailAfterWriteRows, the finished bytes is 36 as the first row size +grep "PrintStatus Failpoint" "$TEST_DIR/lightning.log" | grep -q "finished=36" + # restart lightning from checkpoint, the second line should be written successfully -export GO_FAILPOINTS= +# also check after restart from checkpoint, final finished equals to total +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/PrintStatus=return()" set +e run_lightning -d "$DBPATH" --backend tidb --enable-checkpoint=1 2> /dev/null set -e run_sql 'SELECT j FROM `cp_tsr`.tbl WHERE i = 2;' check_contains "j: 4" +grep "PrintStatus Failpoint" "$TEST_DIR/lightning.log" | grep -q "equal=true"