Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: send backup request in batch #52535

Merged
merged 7 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 157 additions & 138 deletions br/pkg/backup/client.go

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ func newPushDown(mgr ClientMgr, capacity int) *pushDown {
func (push *pushDown) pushBackup(
ctx context.Context,
req backuppb.BackupRequest,
pr *rtree.ProgressRange,
prTree rtree.ProgressRangeTree,
stores []*metapb.Store,
checkpointRunner *checkpoint.CheckpointRunner[checkpoint.BackupKeyType, checkpoint.BackupValueType],
progressCallBack func(ProgressUnit),
progressCallBack func(),
) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("pushDown.pushBackup", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -162,6 +162,10 @@ func (push *pushDown) pushBackup(
}
})
if resp.GetError() == nil {
pr, err := prTree.FindContained(resp.StartKey, resp.EndKey)
if err != nil {
return errors.Annotate(err, "failed to update the backup response")
}
// None error means range has been backuped successfully.
if checkpointRunner != nil {
if err := checkpoint.AppendForBackup(
Expand All @@ -180,7 +184,7 @@ func (push *pushDown) pushBackup(
resp.GetStartKey(), resp.GetEndKey(), resp.GetFiles())

// Update progress
progressCallBack(RegionUnit)
progressCallBack()
} else {
errPb := resp.GetError()
res := errContext.HandleIgnorableError(errPb, store.GetId())
Expand Down
1 change: 1 addition & 0 deletions br/pkg/rtree/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"@com_github_google_btree//:btree",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_log//:log",
"@com_github_pkg_errors//:errors",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
Expand Down
131 changes: 131 additions & 0 deletions br/pkg/rtree/rtree.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/redact"
"github.com/pkg/errors"
)

// Range represents a backup response.
Expand Down Expand Up @@ -69,6 +71,13 @@ func (rg *Range) Contains(key []byte) bool {
(len(end) == 0 || bytes.Compare(key, end) < 0)
}

// ContainsRegion check if the range contains the region's key range.
func (rg *Range) ContainsRegion(startKey, endKey []byte) bool {
Leavrth marked this conversation as resolved.
Show resolved Hide resolved
start, end := rg.StartKey, rg.EndKey
return bytes.Compare(startKey, start) >= 0 &&
(len(end) == 0 || bytes.Compare(endKey, end) <= 0)
}

// Less impls btree.Item.
func (rg *Range) Less(than btree.Item) bool {
// rg.StartKey < than.StartKey
Expand Down Expand Up @@ -293,3 +302,125 @@ type ProgressRange struct {
Origin Range
GroupKey string
}

// Less impls btree.Item.
func (pr *ProgressRange) Less(than btree.Item) bool {
// pr.StartKey <= than.StartKey
ta := than.(*ProgressRange)
return bytes.Compare(pr.Origin.StartKey, ta.Origin.StartKey) < 0
}

var _ btree.Item = &ProgressRange{}

// ProgressRangeTree is a sorted tree for ProgressRanges.
// All the progress ranges it sorted do not overlap.
type ProgressRangeTree struct {
*btree.BTree
Leavrth marked this conversation as resolved.
Show resolved Hide resolved
}

// NewProgressRangeTree returns an empty range tree.
func NewProgressRangeTree() ProgressRangeTree {
return ProgressRangeTree{
BTree: btree.New(32),
}
}

// find is a helper function to find an item that contains the range.
func (rangeTree *ProgressRangeTree) find(pr *ProgressRange) *ProgressRange {
var ret *ProgressRange
rangeTree.DescendLessOrEqual(pr, func(item btree.Item) bool {
ret = item.(*ProgressRange)
return false
})

if ret == nil || !ret.Origin.Contains(pr.Origin.StartKey) {
return nil
}

return ret
}

// Insert inserts a ProgressRange into the tree, it will return an error if there is a overlapping range.
func (rangeTree *ProgressRangeTree) Insert(pr *ProgressRange) error {
overlap := rangeTree.find(pr)
if overlap != nil {
return errors.Errorf("failed to insert the progress range into range tree, "+
"because there is a overlapping range. The insert item start key: %s; "+
"The overlapped item start key: %s, end key: %s.",
redact.Key(pr.Origin.StartKey), redact.Key(overlap.Origin.StartKey), redact.Key(overlap.Origin.EndKey))
}
rangeTree.ReplaceOrInsert(pr)
return nil
}

// FindContained finds if there is a progress range containing the key range [startKey, endKey).
func (rangeTree *ProgressRangeTree) FindContained(startKey, endKey []byte) (*ProgressRange, error) {
var ret *ProgressRange
rangeTree.Descend(func(item btree.Item) bool {
pr := item.(*ProgressRange)
if bytes.Compare(pr.Origin.StartKey, startKey) <= 0 {
ret = pr
return false
}
return true
})

if ret == nil {
return nil, errors.Errorf("Cannot find progress range that contains the start key: %s.", redact.Key(startKey))
}

if !ret.Origin.ContainsRegion(startKey, endKey) {
return nil, errors.Errorf("The given region is not contained in the found progress range. "+
"The region start key is %s; The progress range start key is %s, end key is %s.",
startKey, redact.Key(ret.Origin.StartKey), redact.Key(ret.Origin.EndKey))
}

return ret, nil
}

type progressRangeIterItem struct {
pr *ProgressRange
complete bool
}

type ProgressRangeIter struct {
items []*progressRangeIterItem
left int
}

func (rangeTree *ProgressRangeTree) Iter() *ProgressRangeIter {
items := make([]*progressRangeIterItem, 0, rangeTree.Len())
rangeTree.Ascend(func(item btree.Item) bool {
items = append(items, &progressRangeIterItem{
pr: item.(*ProgressRange),
complete: false,
})
return true
})
return &ProgressRangeIter{
items: items,
left: len(items),
}
}

func (iter *ProgressRangeIter) GetIncompleteRanges() []Range {
Leavrth marked this conversation as resolved.
Show resolved Hide resolved
incompleteRanges := make([]Range, 0, 64*len(iter.items))
for _, item := range iter.items {
if item.complete {
continue
}

incomplete := item.pr.Res.GetIncompleteRange(item.pr.Origin.StartKey, item.pr.Origin.EndKey)
if len(incomplete) == 0 {
item.complete = true
iter.left -= 1
continue
}
incompleteRanges = append(incompleteRanges, incomplete...)
}
return incompleteRanges
}

func (iter *ProgressRangeIter) Len() int {
return iter.left
}
84 changes: 43 additions & 41 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
Expand Down Expand Up @@ -618,51 +619,33 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig

summary.CollectInt("backup total ranges", len(ranges))

var updateCh glue.Progress
var unit backup.ProgressUnit
if len(ranges) < 100 {
unit = backup.RegionUnit
// The number of regions need to backup
approximateRegions := 0
for _, r := range ranges {
var regionCount int
regionCount, err = mgr.GetRegionCount(ctx, r.StartKey, r.EndKey)
if err != nil {
return errors.Trace(err)
}
approximateRegions += regionCount
}
// Redirect to log if there is no log file to avoid unreadable output.
updateCh = g.StartProgress(
ctx, cmdName, int64(approximateRegions), !cfg.LogProgress)
summary.CollectInt("backup total regions", approximateRegions)
} else {
unit = backup.RangeUnit
// To reduce the costs, we can use the range as unit of progress.
updateCh = g.StartProgress(
ctx, cmdName, int64(len(ranges)), !cfg.LogProgress)
approximateRegions, regionCounts, err := getRegionCountOfRanges(ctx, mgr, ranges)
if err != nil {
return errors.Trace(err)
}
// Redirect to log if there is no log file to avoid unreadable output.
updateCh := g.StartProgress(
ctx, cmdName, int64(approximateRegions), !cfg.LogProgress)
summary.CollectInt("backup total regions", approximateRegions)

progressCount := uint64(0)
progressCallBack := func(callBackUnit backup.ProgressUnit) {
if unit == callBackUnit {
updateCh.Inc()
progressCallBack := func() {
updateCh.Inc()
failpoint.Inject("progress-call-back", func(v failpoint.Value) {
log.Info("failpoint progress-call-back injected")
atomic.AddUint64(&progressCount, 1)
failpoint.Inject("progress-call-back", func(v failpoint.Value) {
log.Info("failpoint progress-call-back injected")
if fileName, ok := v.(string); ok {
f, osErr := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if osErr != nil {
log.Warn("failed to create file", zap.Error(osErr))
}
msg := []byte(fmt.Sprintf("%s:%d\n", unit, atomic.LoadUint64(&progressCount)))
_, err = f.Write(msg)
if err != nil {
log.Warn("failed to write data to file", zap.Error(err))
}
if fileName, ok := v.(string); ok {
f, osErr := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if osErr != nil {
log.Warn("failed to create file", zap.Error(osErr))
}
})
}
msg := []byte(fmt.Sprintf("region:%d\n", atomic.LoadUint64(&progressCount)))
_, err = f.Write(msg)
if err != nil {
log.Warn("failed to write data to file", zap.Error(err))
}
}
})
}

if cfg.UseCheckpoint {
Expand Down Expand Up @@ -701,7 +684,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
})

metawriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile)
err = client.BackupRanges(ctx, ranges, req, uint(cfg.Concurrency), cfg.ReplicaReadLabel, metawriter, progressCallBack)
err = client.BackupRanges(ctx, ranges, regionCounts, req, uint(cfg.Concurrency), cfg.ReplicaReadLabel, metawriter, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -761,6 +744,25 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
return nil
}

func getRegionCountOfRanges(
ctx context.Context,
mgr *conn.Mgr,
ranges []rtree.Range,
) (int, []int, error) {
// The number of regions need to backup
approximateRegions := 0
// The number array of regions of ranges, thecounts[i] is the number of regions of the range[i].
counts := make([]int, 0, len(ranges))
for _, r := range ranges {
regionCount, err := mgr.GetRegionCount(ctx, r.StartKey, r.EndKey)
if err != nil {
return 0, nil, errors.Trace(err)
}
counts = append(counts, regionCount)
}
return approximateRegions, counts, nil
}

// ParseTSString port from tidb setSnapshotTS.
func ParseTSString(ts string, tzCheck bool) (uint64, error) {
if len(ts) == 0 {
Expand Down
12 changes: 2 additions & 10 deletions br/pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
updateCh := g.StartProgress(
ctx, cmdName, int64(approximateRegions), !cfg.LogProgress)

progressCallBack := func(unit backup.ProgressUnit) {
if unit == backup.RangeUnit {
return
}
progressCallBack := func() {
updateCh.Inc()
}

Expand All @@ -217,14 +214,9 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
StartKey: backupRange.StartKey,
EndKey: backupRange.EndKey,
}
progressRange := &rtree.ProgressRange{
Res: rtree.NewRangeTree(),
Incomplete: []rtree.Range{rg},
Origin: rg,
}
metaWriter := metautil.NewMetaWriter(client.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, &cfg.CipherInfo)
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile)
err = client.BackupRange(ctx, req, map[string]string{}, progressRange, metaWriter, progressCallBack)
err = client.BackupRanges(ctx, []rtree.Range{rg}, []int{approximateRegions}, req, uint(cfg.Concurrency), nil, metaWriter, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down
8 changes: 3 additions & 5 deletions br/pkg/task/backup_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func RunBackupTxn(c context.Context, g glue.Glue, cmdName string, cfg *TxnKvConf
if err != nil {
return errors.Trace(err)
}
regionCounts := []int{approximateRegions}

summary.CollectInt("backup total regions", approximateRegions)

Expand All @@ -177,10 +178,7 @@ func RunBackupTxn(c context.Context, g glue.Glue, cmdName string, cfg *TxnKvConf
updateCh := g.StartProgress(
ctx, cmdName, int64(approximateRegions), !cfg.LogProgress)

progressCallBack := func(unit backup.ProgressUnit) {
if unit == backup.RangeUnit {
return
}
progressCallBack := func() {
updateCh.Inc()
}
backupTS, err := client.GetCurrentTS(ctx)
Expand All @@ -204,7 +202,7 @@ func RunBackupTxn(c context.Context, g glue.Glue, cmdName string, cfg *TxnKvConf

metaWriter := metautil.NewMetaWriter(client.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, &cfg.CipherInfo)
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile)
err = client.BackupRanges(ctx, backupRanges, req, uint(cfg.Concurrency), nil, metaWriter, progressCallBack)
err = client.BackupRanges(ctx, backupRanges, regionCounts, req, uint(cfg.Concurrency), nil, metaWriter, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/tests/br_300_small_tables/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ backupv2_size=`grep "backup-data-size" "${BACKUPMETAV2_LOG}" | grep -oP '\[\K[^\
echo "backup meta v2 backup size is ${backupv2_size}"
export GO_FAILPOINTS=""

if [[ "$(wc -l <$PROGRESS_FILE)" == "1" ]] && [[ $(grep -c "range" $PROGRESS_FILE) == "1" ]];
if [[ "$(wc -l <$PROGRESS_FILE)" == "1" ]] && [[ $(grep -c "region" $PROGRESS_FILE) == "1" ]];
then
echo "use the correct progress unit"
else
Expand Down
Loading