From 14d356736afe8101d3ee7a8d2d40e5d95b02d279 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 22 Nov 2022 18:37:14 +0800 Subject: [PATCH 01/10] lightning: recover some metrics from checkpoint Signed-off-by: lance6716 --- br/pkg/lightning/lightning.go | 13 +++++++++++++ br/pkg/lightning/restore/table_restore.go | 2 ++ 2 files changed, 15 insertions(+) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index af198f4fdea58..d781d0d4b214d 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -435,6 +435,19 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti } }) + failpoint.Inject("printMetrics", func() { + defer func() { + bytesRestored := metric.ReadCounter(l.metrics.BytesCounter.WithLabelValues(metric.BytesStateRestored)) + imported := l.status.FinishedFileSize.Load() + o.logger.Warn("printMetrics Failpoint", + zap.Float64("bytesRestored", bytesRestored), + zap.Int64("imported", imported)) + if int64(bytesRestored) != imported { + o.logger.Error("printMetrics Failpoint is failed") + } + }() + }) + if err := taskCfg.TiDB.Security.RegisterMySQL(); err != nil { return common.ErrInvalidTLSConfig.Wrap(err) } diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 11038d62195ea..816203923354c 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -398,6 +398,7 @@ func (tr *TableRestore) restoreEngine( defer cancel() // all data has finished written, we can close the engine directly. if cp.Status >= checkpoints.CheckpointStatusAllWritten { + // here? engineCfg := &backend.EngineConfig{ TableInfo: tr.tableInfo, } @@ -475,6 +476,7 @@ func (tr *TableRestore) restoreEngine( // Restore table data for chunkIndex, chunk := range cp.Chunks { + // add offset? if chunk.Chunk.Offset >= chunk.Chunk.EndOffset { continue } From 29593bddf5c34decaa2f736d98d43d90f99a9e97 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 22 Nov 2022 18:57:52 +0800 Subject: [PATCH 02/10] sync code Signed-off-by: lance6716 --- br/pkg/lightning/lightning.go | 2 +- br/tests/lightning_checkpoint_chunks/run.sh | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index d781d0d4b214d..78a50cfd7ceed 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -435,7 +435,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti } }) - failpoint.Inject("printMetrics", func() { + failpoint.Inject("PrintMetrics", func() { defer func() { bytesRestored := metric.ReadCounter(l.metrics.BytesCounter.WithLabelValues(metric.BytesStateRestored)) imported := l.status.FinishedFileSize.Load() diff --git a/br/tests/lightning_checkpoint_chunks/run.sh b/br/tests/lightning_checkpoint_chunks/run.sh index 35cabe0aadfc5..cd90e9f2d824a 100755 --- a/br/tests/lightning_checkpoint_chunks/run.sh +++ b/br/tests/lightning_checkpoint_chunks/run.sh @@ -77,6 +77,7 @@ run_sql 'DROP DATABASE IF EXISTS `tidb_lightning_checkpoint_test_cpch.1234567890 # Set the failpoint to kill the lightning instance as soon as one chunk is imported, via signal mechanism # If checkpoint does work, this should only kill $CHUNK_COUNT instances of lightnings. export GO_FAILPOINTS="$TASKID_FAILPOINTS;github.com/pingcap/tidb/br/pkg/lightning/restore/KillIfImportedChunk=return" +export GO_FAILPOINTS="${GO_FAILPOINTS};github.com/pingcap/tidb/br/pkg/lightning/PrintMetrics=return()" for i in $(seq "$CHUNK_COUNT"); do echo "******** Importing Chunk Now (step $i/$CHUNK_COUNT) ********" @@ -84,6 +85,7 @@ for i in $(seq "$CHUNK_COUNT"); do done verify_checkpoint_noop +read -p 123 # Repeat, but using the file checkpoint run_sql 'DROP DATABASE IF EXISTS cpch_tsr' From f4e36dcbb36f198d61e7cd0cd58fc8090237c225 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 23 Nov 2022 08:43:45 +0800 Subject: [PATCH 03/10] sync code Signed-off-by: lance6716 --- br/pkg/lightning/lightning.go | 15 +++++++-------- br/tests/lightning_checkpoint_chunks/run.sh | 2 -- br/tests/lightning_checkpoint_columns/run.sh | 3 +++ 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 78a50cfd7ceed..778b8fe0d9925 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -435,16 +435,15 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti } }) - failpoint.Inject("PrintMetrics", func() { + failpoint.Inject("PrintStatus", func() { defer func() { - bytesRestored := metric.ReadCounter(l.metrics.BytesCounter.WithLabelValues(metric.BytesStateRestored)) - imported := l.status.FinishedFileSize.Load() - o.logger.Warn("printMetrics Failpoint", - zap.Float64("bytesRestored", bytesRestored), - zap.Int64("imported", imported)) - if int64(bytesRestored) != imported { - o.logger.Error("printMetrics Failpoint is failed") + if r := recover(); r != nil { + o.logger.Error("panic", zap.Any("r", r)) } + finished, total := l.Status() + o.logger.Warn("PrintStatus Failpoint", + zap.Int64("finished", finished), + zap.Int64("total", total)) }() }) diff --git a/br/tests/lightning_checkpoint_chunks/run.sh b/br/tests/lightning_checkpoint_chunks/run.sh index cd90e9f2d824a..35cabe0aadfc5 100755 --- a/br/tests/lightning_checkpoint_chunks/run.sh +++ b/br/tests/lightning_checkpoint_chunks/run.sh @@ -77,7 +77,6 @@ run_sql 'DROP DATABASE IF EXISTS `tidb_lightning_checkpoint_test_cpch.1234567890 # Set the failpoint to kill the lightning instance as soon as one chunk is imported, via signal mechanism # If checkpoint does work, this should only kill $CHUNK_COUNT instances of lightnings. export GO_FAILPOINTS="$TASKID_FAILPOINTS;github.com/pingcap/tidb/br/pkg/lightning/restore/KillIfImportedChunk=return" -export GO_FAILPOINTS="${GO_FAILPOINTS};github.com/pingcap/tidb/br/pkg/lightning/PrintMetrics=return()" for i in $(seq "$CHUNK_COUNT"); do echo "******** Importing Chunk Now (step $i/$CHUNK_COUNT) ********" @@ -85,7 +84,6 @@ for i in $(seq "$CHUNK_COUNT"); do done verify_checkpoint_noop -read -p 123 # Repeat, but using the file checkpoint run_sql 'DROP DATABASE IF EXISTS cpch_tsr' diff --git a/br/tests/lightning_checkpoint_columns/run.sh b/br/tests/lightning_checkpoint_columns/run.sh index 401c75cfb9f64..c99428a6a2fb3 100755 --- a/br/tests/lightning_checkpoint_columns/run.sh +++ b/br/tests/lightning_checkpoint_columns/run.sh @@ -29,6 +29,7 @@ echo "INSERT INTO tbl (j, i) VALUES (3, 1),(4, 2);" > "$DBPATH/cp_tsr.tbl.sql" # Set the failpoint to kill the lightning instance as soon as one row is written PKG="github.com/pingcap/tidb/br/pkg/lightning/restore" export GO_FAILPOINTS="$PKG/SlowDownWriteRows=sleep(1000);$PKG/FailAfterWriteRows=panic;$PKG/SetMinDeliverBytes=return(1)" +export GO_FAILPOINTS="${GO_FAILPOINTS};github.com/pingcap/tidb/br/pkg/lightning/PrintStatus=return()" # Start importing the tables. run_sql 'DROP DATABASE IF EXISTS cp_tsr' @@ -40,6 +41,8 @@ set -e run_sql 'SELECT count(*) FROM `cp_tsr`.tbl' check_contains "count(*): 1" +read -p 123 + # restart lightning from checkpoint, the second line should be written successfully export GO_FAILPOINTS= set +e From d0356abf85b12fde012b732cab82e112905e9955 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 23 Nov 2022 08:54:22 +0800 Subject: [PATCH 04/10] sync code Signed-off-by: lance6716 --- br/pkg/lightning/lightning.go | 7 +++---- br/pkg/lightning/restore/restore.go | 9 ++++++++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 778b8fe0d9925..0ffdd1ac5acda 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -436,14 +436,13 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti }) failpoint.Inject("PrintStatus", func() { + // copy to FailAfterWriteRows defer func() { - if r := recover(); r != nil { - o.logger.Error("panic", zap.Any("r", r)) - } finished, total := l.Status() o.logger.Warn("PrintStatus Failpoint", zap.Int64("finished", finished), - zap.Int64("total", total)) + zap.Int64("total", total), + zap.Bool("equal", finished == total)) }() }) diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 0a0e05b45ac5d..6c6d164b2d5e0 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2442,7 +2442,14 @@ func (cr *chunkRestore) deliverLoop( failpoint.Inject("SlowDownWriteRows", func() { deliverLogger.Warn("Slowed down write rows") }) - failpoint.Inject("FailAfterWriteRows", nil) + failpoint.Inject("FailAfterWriteRows", func() { + finished := rc.status.FinishedFileSize.Load() + total := rc.status.TotalFileSize.Load() + deliverLogger.Warn("PrintStatus Failpoint", + zap.Int64("finished", finished), + zap.Int64("total", total)) + panic("FailAfterWriteRows") + }) // TODO: for local backend, we may save checkpoint more frequently, e.g. after written // 10GB kv pairs to data engine, we can do a flush for both data & index engine, then we // can safely update current checkpoint. From e97a4cc88b6f5012c893d32774b05b1dd52cd9b5 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 23 Nov 2022 09:51:55 +0800 Subject: [PATCH 05/10] try impl Signed-off-by: lance6716 --- br/pkg/lightning/lightning.go | 3 +-- br/pkg/lightning/restore/restore.go | 9 +++++++++ br/pkg/lightning/restore/table_restore.go | 12 +++++++++--- br/tests/lightning_checkpoint_columns/run.sh | 6 ++++-- 4 files changed, 23 insertions(+), 7 deletions(-) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 0ffdd1ac5acda..b18631cec1e15 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -515,8 +515,6 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti dbMetas := mdl.GetDatabases() web.BroadcastInitProgress(dbMetas) - var procedure *restore.Controller - param := &restore.ControllerParam{ DBMetas: dbMetas, Status: &l.status, @@ -527,6 +525,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti CheckpointName: o.checkpointName, } + var procedure *restore.Controller procedure, err = restore.NewRestoreController(ctx, taskCfg, param) if err != nil { o.logger.Error("restore failed", log.ShortError(err)) diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 6c6d164b2d5e0..5df58f6a5ce0d 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -232,7 +232,15 @@ type Controller struct { precheckItemBuilder *PrecheckItemBuilder } +// LightningStatus provides the finished bytes and total bytes of the current task. +// It should keep the value after restart from checkpoint. +// When it is tidb backend, FinishedFileSize can be counted after chunk data is +// restored to tidb. When it is local backend it's counted after whole engine is +// imported. +// TotalFileSize may be an estimated value, so when the task is finished, it may +// not equal to FinishedFileSize. type LightningStatus struct { + backend string FinishedFileSize atomic.Int64 TotalFileSize atomic.Int64 } @@ -353,6 +361,7 @@ func NewRestoreControllerWithPauser( default: return nil, common.ErrUnknownBackend.GenWithStackByArgs(cfg.TikvImporter.Backend) } + p.Status.backend = cfg.TikvImporter.Backend var metaBuilder metaMgrBuilder isSSTImport := cfg.TikvImporter.Backend == config.BackendLocal diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 816203923354c..e425049c8d62b 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -327,7 +327,7 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp dataWorker := rc.closedEngineLimit.Apply() defer rc.closedEngineLimit.Recycle(dataWorker) err = tr.importEngine(ctx, dataClosedEngine, rc, eid, ecp) - if rc.status != nil { + if rc.status != nil && rc.status.backend == config.BackendLocal { for _, chunk := range ecp.Chunks { rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset) } @@ -398,7 +398,6 @@ func (tr *TableRestore) restoreEngine( defer cancel() // all data has finished written, we can close the engine directly. if cp.Status >= checkpoints.CheckpointStatusAllWritten { - // here? engineCfg := &backend.EngineConfig{ TableInfo: tr.tableInfo, } @@ -407,6 +406,11 @@ func (tr *TableRestore) restoreEngine( if err != nil { return closedEngine, errors.Trace(err) } + if rc.status != nil && rc.status.backend == config.BackendTiDB { + for _, chunk := range cp.Chunks { + rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset) + } + } return closedEngine, nil } @@ -476,7 +480,9 @@ func (tr *TableRestore) restoreEngine( // Restore table data for chunkIndex, chunk := range cp.Chunks { - // add offset? + if rc.status != nil && rc.status.backend == config.BackendTiDB { + rc.status.FinishedFileSize.Add(chunk.Chunk.Offset) + } if chunk.Chunk.Offset >= chunk.Chunk.EndOffset { continue } diff --git a/br/tests/lightning_checkpoint_columns/run.sh b/br/tests/lightning_checkpoint_columns/run.sh index c99428a6a2fb3..b796e19e0065b 100755 --- a/br/tests/lightning_checkpoint_columns/run.sh +++ b/br/tests/lightning_checkpoint_columns/run.sh @@ -28,7 +28,8 @@ echo "INSERT INTO tbl (j, i) VALUES (3, 1),(4, 2);" > "$DBPATH/cp_tsr.tbl.sql" # Set minDeliverBytes to a small enough number to only write only 1 row each time # Set the failpoint to kill the lightning instance as soon as one row is written PKG="github.com/pingcap/tidb/br/pkg/lightning/restore" -export GO_FAILPOINTS="$PKG/SlowDownWriteRows=sleep(1000);$PKG/FailAfterWriteRows=panic;$PKG/SetMinDeliverBytes=return(1)" +export GO_FAILPOINTS="$PKG/SlowDownWriteRows=sleep(1000);$PKG/FailAfterWriteRows=return();$PKG/SetMinDeliverBytes=return(1)" +# Check after 1 row is written in tidb backend, the finished progress is updated export GO_FAILPOINTS="${GO_FAILPOINTS};github.com/pingcap/tidb/br/pkg/lightning/PrintStatus=return()" # Start importing the tables. @@ -44,7 +45,8 @@ check_contains "count(*): 1" read -p 123 # restart lightning from checkpoint, the second line should be written successfully -export GO_FAILPOINTS= +# also check after restart from checkpoint, final finished equals to total +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/PrintStatus=return()" set +e run_lightning -d "$DBPATH" --backend tidb --enable-checkpoint=1 2> /dev/null set -e From c373c08fdeb40d1bdcd9794af153e41ef69d4e71 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 23 Nov 2022 09:59:58 +0800 Subject: [PATCH 06/10] sync code Signed-off-by: lance6716 --- br/pkg/lightning/restore/restore.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 5df58f6a5ce0d..c7ce98b89f4f3 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2436,8 +2436,13 @@ func (cr *chunkRestore) deliverLoop( // comes from chunk.Chunk.Offset. so it shouldn't happen that currOffset - startOffset < 0. // but we met it one time, but cannot reproduce it now, we add this check to make code more robust // TODO: reproduce and find the root cause and fix it completely - if currOffset >= startOffset { - m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(currOffset - startOffset)) + + delta := currOffset - startOffset + if delta >= 0 { + m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(delta)) + if rc.status != nil && rc.status.backend == config.BackendTiDB { + rc.status.FinishedFileSize.Add(delta) + } } else { deliverLogger.Warn("offset go back", zap.Int64("curr", currOffset), zap.Int64("start", startOffset)) From 3457792185d986240fa35302ba53b9b34adae36c Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 23 Nov 2022 10:10:51 +0800 Subject: [PATCH 07/10] finish Signed-off-by: lance6716 --- br/tests/lightning_checkpoint_columns/run.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/br/tests/lightning_checkpoint_columns/run.sh b/br/tests/lightning_checkpoint_columns/run.sh index b796e19e0065b..13d5705aae6f1 100755 --- a/br/tests/lightning_checkpoint_columns/run.sh +++ b/br/tests/lightning_checkpoint_columns/run.sh @@ -42,7 +42,8 @@ set -e run_sql 'SELECT count(*) FROM `cp_tsr`.tbl' check_contains "count(*): 1" -read -p 123 +# After FailAfterWriteRows, the finished bytes is 36 as the first row size +grep "PrintStatus Failpoint" "$TEST_DIR/lightning.log" | grep -q "finished=36" # restart lightning from checkpoint, the second line should be written successfully # also check after restart from checkpoint, final finished equals to total @@ -53,3 +54,4 @@ set -e run_sql 'SELECT j FROM `cp_tsr`.tbl WHERE i = 2;' check_contains "j: 4" +grep "PrintStatus Failpoint" "$TEST_DIR/lightning.log" | grep -q "equal=true" From 912d1a8108702fabe64bbaa3fbcf6f0279e9c8f2 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 23 Nov 2022 10:13:00 +0800 Subject: [PATCH 08/10] Update br/pkg/lightning/lightning.go --- br/pkg/lightning/lightning.go | 1 - 1 file changed, 1 deletion(-) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index b18631cec1e15..9a3a78031b424 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -436,7 +436,6 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti }) failpoint.Inject("PrintStatus", func() { - // copy to FailAfterWriteRows defer func() { finished, total := l.Status() o.logger.Warn("PrintStatus Failpoint", From 7c1de7e3da6583bdc178a9c7ffcb5901449b4403 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 23 Nov 2022 10:42:04 +0800 Subject: [PATCH 09/10] fix unexpected change Signed-off-by: lance6716 --- br/pkg/lightning/restore/restore.go | 4 +--- br/tests/lightning_checkpoint_columns/run.sh | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index c7ce98b89f4f3..10ef56507b8ce 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2455,15 +2455,13 @@ func (cr *chunkRestore) deliverLoop( } failpoint.Inject("SlowDownWriteRows", func() { deliverLogger.Warn("Slowed down write rows") - }) - failpoint.Inject("FailAfterWriteRows", func() { finished := rc.status.FinishedFileSize.Load() total := rc.status.TotalFileSize.Load() deliverLogger.Warn("PrintStatus Failpoint", zap.Int64("finished", finished), zap.Int64("total", total)) - panic("FailAfterWriteRows") }) + failpoint.Inject("FailAfterWriteRows", nil) // TODO: for local backend, we may save checkpoint more frequently, e.g. after written // 10GB kv pairs to data engine, we can do a flush for both data & index engine, then we // can safely update current checkpoint. diff --git a/br/tests/lightning_checkpoint_columns/run.sh b/br/tests/lightning_checkpoint_columns/run.sh index 13d5705aae6f1..5809d05a1b830 100755 --- a/br/tests/lightning_checkpoint_columns/run.sh +++ b/br/tests/lightning_checkpoint_columns/run.sh @@ -28,7 +28,7 @@ echo "INSERT INTO tbl (j, i) VALUES (3, 1),(4, 2);" > "$DBPATH/cp_tsr.tbl.sql" # Set minDeliverBytes to a small enough number to only write only 1 row each time # Set the failpoint to kill the lightning instance as soon as one row is written PKG="github.com/pingcap/tidb/br/pkg/lightning/restore" -export GO_FAILPOINTS="$PKG/SlowDownWriteRows=sleep(1000);$PKG/FailAfterWriteRows=return();$PKG/SetMinDeliverBytes=return(1)" +export GO_FAILPOINTS="$PKG/SlowDownWriteRows=sleep(1000);$PKG/FailAfterWriteRows=panic;$PKG/SetMinDeliverBytes=return(1)" # Check after 1 row is written in tidb backend, the finished progress is updated export GO_FAILPOINTS="${GO_FAILPOINTS};github.com/pingcap/tidb/br/pkg/lightning/PrintStatus=return()" From 62506092ec0f17674d5637bdd4d7ea4afe5041cf Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 23 Nov 2022 11:33:03 +0800 Subject: [PATCH 10/10] consider chunk.Key.Offset Signed-off-by: lance6716 --- br/pkg/lightning/restore/table_restore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index e425049c8d62b..ad975bc2f6bad 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -408,7 +408,7 @@ func (tr *TableRestore) restoreEngine( } if rc.status != nil && rc.status.backend == config.BackendTiDB { for _, chunk := range cp.Chunks { - rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset) + rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset) } } return closedEngine, nil @@ -481,7 +481,7 @@ func (tr *TableRestore) restoreEngine( // Restore table data for chunkIndex, chunk := range cp.Chunks { if rc.status != nil && rc.status.backend == config.BackendTiDB { - rc.status.FinishedFileSize.Add(chunk.Chunk.Offset) + rc.status.FinishedFileSize.Add(chunk.Chunk.Offset - chunk.Key.Offset) } if chunk.Chunk.Offset >= chunk.Chunk.EndOffset { continue