Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: finer granularity for tidb backend finished bytes #39318

Merged
merged 13 commits into from
Nov 23, 2022
13 changes: 11 additions & 2 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,16 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
}
})

failpoint.Inject("PrintStatus", func() {
defer func() {
finished, total := l.Status()
o.logger.Warn("PrintStatus Failpoint",
zap.Int64("finished", finished),
zap.Int64("total", total),
zap.Bool("equal", finished == total))
}()
})

if err := taskCfg.TiDB.Security.RegisterMySQL(); err != nil {
return common.ErrInvalidTLSConfig.Wrap(err)
}
Expand Down Expand Up @@ -504,8 +514,6 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
dbMetas := mdl.GetDatabases()
web.BroadcastInitProgress(dbMetas)

var procedure *restore.Controller

param := &restore.ControllerParam{
DBMetas: dbMetas,
Status: &l.status,
Expand All @@ -516,6 +524,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
CheckpointName: o.checkpointName,
}

var procedure *restore.Controller
procedure, err = restore.NewRestoreController(ctx, taskCfg, param)
if err != nil {
o.logger.Error("restore failed", log.ShortError(err))
Expand Down
23 changes: 21 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,15 @@ type Controller struct {
precheckItemBuilder *PrecheckItemBuilder
}

// LightningStatus provides the finished bytes and total bytes of the current task.
// It should keep the value after restart from checkpoint.
// When it is tidb backend, FinishedFileSize can be counted after chunk data is
// restored to tidb. When it is local backend it's counted after whole engine is
// imported.
// TotalFileSize may be an estimated value, so when the task is finished, it may
// not equal to FinishedFileSize.
type LightningStatus struct {
backend string
FinishedFileSize atomic.Int64
TotalFileSize atomic.Int64
}
Expand Down Expand Up @@ -353,6 +361,7 @@ func NewRestoreControllerWithPauser(
default:
return nil, common.ErrUnknownBackend.GenWithStackByArgs(cfg.TikvImporter.Backend)
}
p.Status.backend = cfg.TikvImporter.Backend

var metaBuilder metaMgrBuilder
isSSTImport := cfg.TikvImporter.Backend == config.BackendLocal
Expand Down Expand Up @@ -2427,8 +2436,13 @@ func (cr *chunkRestore) deliverLoop(
// comes from chunk.Chunk.Offset. so it shouldn't happen that currOffset - startOffset < 0.
// but we met it one time, but cannot reproduce it now, we add this check to make code more robust
// TODO: reproduce and find the root cause and fix it completely
if currOffset >= startOffset {
m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(currOffset - startOffset))

delta := currOffset - startOffset
if delta >= 0 {
m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(delta))
if rc.status != nil && rc.status.backend == config.BackendTiDB {
rc.status.FinishedFileSize.Add(delta)
}
} else {
deliverLogger.Warn("offset go back", zap.Int64("curr", currOffset),
zap.Int64("start", startOffset))
Expand All @@ -2441,6 +2455,11 @@ func (cr *chunkRestore) deliverLoop(
}
failpoint.Inject("SlowDownWriteRows", func() {
deliverLogger.Warn("Slowed down write rows")
finished := rc.status.FinishedFileSize.Load()
total := rc.status.TotalFileSize.Load()
deliverLogger.Warn("PrintStatus Failpoint",
zap.Int64("finished", finished),
zap.Int64("total", total))
})
failpoint.Inject("FailAfterWriteRows", nil)
// TODO: for local backend, we may save checkpoint more frequently, e.g. after written
Expand Down
10 changes: 9 additions & 1 deletion br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
dataWorker := rc.closedEngineLimit.Apply()
defer rc.closedEngineLimit.Recycle(dataWorker)
err = tr.importEngine(ctx, dataClosedEngine, rc, eid, ecp)
if rc.status != nil {
if rc.status != nil && rc.status.backend == config.BackendLocal {
for _, chunk := range ecp.Chunks {
rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset)
}
Expand Down Expand Up @@ -406,6 +406,11 @@ func (tr *TableRestore) restoreEngine(
if err != nil {
return closedEngine, errors.Trace(err)
}
if rc.status != nil && rc.status.backend == config.BackendTiDB {
for _, chunk := range cp.Chunks {
rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be chunk.Chunk.EndOffset-chunk.Key.Offset, in case of strict format where chunk.Key.Offset is not 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with strict format, can you give some reference to the behaviour?

I think you want to keep a property that the value of FinishedFileSize.Add should be same no matter it's processed without failure or recovered from checkpoint?

Copy link
Contributor Author

@lance6716 lance6716 Nov 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I guess the strict mode will split a file into many chunks, the split position is written in Key.Offset

okJiang marked this conversation as resolved.
Show resolved Hide resolved
}
}
return closedEngine, nil
}

Expand Down Expand Up @@ -475,6 +480,9 @@ func (tr *TableRestore) restoreEngine(

// Restore table data
for chunkIndex, chunk := range cp.Chunks {
if rc.status != nil && rc.status.backend == config.BackendTiDB {
rc.status.FinishedFileSize.Add(chunk.Chunk.Offset)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems that not restore completedly here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here I want to recover FinishedFileSize from checkpoint, the restore process of current run is not started

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be chunk.Chunk.Offset - chunk.Key.Offset to count previous imported data

}
if chunk.Chunk.Offset >= chunk.Chunk.EndOffset {
continue
}
Expand Down
9 changes: 8 additions & 1 deletion br/tests/lightning_checkpoint_columns/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ echo "INSERT INTO tbl (j, i) VALUES (3, 1),(4, 2);" > "$DBPATH/cp_tsr.tbl.sql"
# Set the failpoint to kill the lightning instance as soon as one row is written
PKG="github.com/pingcap/tidb/br/pkg/lightning/restore"
export GO_FAILPOINTS="$PKG/SlowDownWriteRows=sleep(1000);$PKG/FailAfterWriteRows=panic;$PKG/SetMinDeliverBytes=return(1)"
# Check after 1 row is written in tidb backend, the finished progress is updated
export GO_FAILPOINTS="${GO_FAILPOINTS};github.com/pingcap/tidb/br/pkg/lightning/PrintStatus=return()"

# Start importing the tables.
run_sql 'DROP DATABASE IF EXISTS cp_tsr'
Expand All @@ -40,11 +42,16 @@ set -e
run_sql 'SELECT count(*) FROM `cp_tsr`.tbl'
check_contains "count(*): 1"

# After FailAfterWriteRows, the finished bytes is 36 as the first row size
grep "PrintStatus Failpoint" "$TEST_DIR/lightning.log" | grep -q "finished=36"

# restart lightning from checkpoint, the second line should be written successfully
export GO_FAILPOINTS=
# also check after restart from checkpoint, final finished equals to total
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/PrintStatus=return()"
set +e
run_lightning -d "$DBPATH" --backend tidb --enable-checkpoint=1 2> /dev/null
set -e

run_sql 'SELECT j FROM `cp_tsr`.tbl WHERE i = 2;'
check_contains "j: 4"
grep "PrintStatus Failpoint" "$TEST_DIR/lightning.log" | grep -q "equal=true"