From 81da4f8811f2fc150ed176d7ef3bcde3ce112d9f Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Tue, 7 May 2024 15:31:07 +0800 Subject: [PATCH] br: send backup request in batch (#52535) ref pingcap/tidb#52534 --- br/cmd/br/main_test.go | 1 + br/pkg/backup/client.go | 309 +++++++++++++++------------- br/pkg/backup/push.go | 10 +- br/pkg/rtree/BUILD.bazel | 3 +- br/pkg/rtree/rtree.go | 127 ++++++++++++ br/pkg/rtree/rtree_test.go | 55 +++++ br/pkg/task/backup.go | 84 ++++---- br/pkg/task/backup_raw.go | 12 +- br/pkg/task/backup_txn.go | 8 +- br/tests/br_300_small_tables/run.sh | 2 +- br/tests/br_tikv_outage2/run.sh | 2 +- 11 files changed, 413 insertions(+), 200 deletions(-) diff --git a/br/cmd/br/main_test.go b/br/cmd/br/main_test.go index a9cd31f60dc1a..5fa5b1439a2ec 100644 --- a/br/cmd/br/main_test.go +++ b/br/cmd/br/main_test.go @@ -47,6 +47,7 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), goleak.IgnoreTopFunction("google.golang.org/grpc.(*ClientConn).WaitForStateChange"), goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/config/retry.(*Config).createBackoffFn.newBackoffFn.func2"), + goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.NewCallbackSerializer"), ) } diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 7bf187ffce6fb..d49987c9a7c62 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -80,10 +80,6 @@ const ( // We need to be more patient. backupFineGrainedMaxBackoff = 3600000 backupRetryTimes = 5 - // RangeUnit represents the progress updated counter when a range finished. - RangeUnit ProgressUnit = "range" - // RegionUnit represents the progress updated counter when a region finished. - RegionUnit ProgressUnit = "region" ) // Client is a client instructs TiKV how to do a backup. @@ -290,7 +286,7 @@ func (bc *Client) StartCheckpointRunner( backupTS uint64, ranges []rtree.Range, safePointID string, - progressCallBack func(ProgressUnit), + progressCallBack func(), ) (err error) { if bc.checkpointMeta == nil { bc.checkpointMeta = &checkpoint.CheckpointMetadataForBackup{ @@ -324,8 +320,8 @@ func (bc *Client) WaitForFinishCheckpoint(ctx context.Context, flush bool) { } } -// GetProgressRange loads the checkpoint(finished) sub-ranges of the current range, and calculate its incompleted sub-ranges. -func (bc *Client) GetProgressRange(r rtree.Range) (*rtree.ProgressRange, error) { +// getProgressRange loads the checkpoint(finished) sub-ranges of the current range, and calculate its incompleted sub-ranges. +func (bc *Client) getProgressRange(r rtree.Range) *rtree.ProgressRange { // use groupKey to distinguish different ranges groupKey := base64.URLEncoding.EncodeToString(r.StartKey) if bc.checkpointMeta != nil && len(bc.checkpointMeta.CheckpointDataMap) > 0 { @@ -338,7 +334,7 @@ func (bc *Client) GetProgressRange(r rtree.Range) (*rtree.ProgressRange, error) Incomplete: incomplete, Origin: r, GroupKey: groupKey, - }, nil + } } } @@ -351,11 +347,11 @@ func (bc *Client) GetProgressRange(r rtree.Range) (*rtree.ProgressRange, error) }, Origin: r, GroupKey: groupKey, - }, nil + } } // LoadCheckpointRange loads the checkpoint(finished) sub-ranges of the current range, and calculate its incompleted sub-ranges. -func (bc *Client) loadCheckpointRanges(ctx context.Context, progressCallBack func(ProgressUnit)) (map[string]rtree.RangeTree, error) { +func (bc *Client) loadCheckpointRanges(ctx context.Context, progressCallBack func()) (map[string]rtree.RangeTree, error) { rangeDataMap := make(map[string]rtree.RangeTree) pastDureTime, err := checkpoint.WalkCheckpointFileForBackup(ctx, bc.storage, bc.cipher, func(groupKey string, rg checkpoint.BackupValueType) { @@ -365,7 +361,7 @@ func (bc *Client) loadCheckpointRanges(ctx context.Context, progressCallBack fun rangeDataMap[groupKey] = rangeTree } rangeTree.Put(rg.StartKey, rg.EndKey, rg.Files) - progressCallBack(RegionUnit) + progressCallBack() }) // we should adjust start-time of the summary to `pastDureTime` earlier @@ -787,15 +783,80 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S return nil } +func (bc *Client) getProgressRanges(ranges []rtree.Range) []*rtree.ProgressRange { + prs := make([]*rtree.ProgressRange, 0, len(ranges)) + for _, r := range ranges { + prs = append(prs, bc.getProgressRange(r)) + } + return prs +} + +func buildProgressRangeTree(pranges []*rtree.ProgressRange) (rtree.ProgressRangeTree, []*kvrpcpb.KeyRange, error) { + // the response from TiKV only contains the region's key, so use the + // progress range tree to quickly seek the region's corresponding progress range. + progressRangeTree := rtree.NewProgressRangeTree() + subRangesCount := 0 + for _, pr := range pranges { + if err := progressRangeTree.Insert(pr); err != nil { + return progressRangeTree, nil, errors.Trace(err) + } + subRangesCount += len(pr.Incomplete) + } + // either the `incomplete` is origin range itself, + // or the `incomplete` is sub-ranges split by checkpoint of origin range. + subRanges := make([]*kvrpcpb.KeyRange, 0, subRangesCount) + progressRangeTree.Ascend(func(pr *rtree.ProgressRange) bool { + for _, r := range pr.Incomplete { + subRanges = append(subRanges, &kvrpcpb.KeyRange{ + StartKey: r.StartKey, + EndKey: r.EndKey, + }) + } + return true + }) + + return progressRangeTree, subRanges, nil +} + +func (bc *Client) getBackupTargetStores( + ctx context.Context, + replicaReadLabel map[string]string, +) ([]*metapb.Store, map[uint64]struct{}, error) { + allStores, err := conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash) + if err != nil { + return nil, nil, errors.Trace(err) + } + var targetStores []*metapb.Store + targetStoreIds := make(map[uint64]struct{}) + if len(replicaReadLabel) == 0 { + targetStores = allStores + } else { + for _, store := range allStores { + for _, label := range store.Labels { + if val, ok := replicaReadLabel[label.Key]; ok && val == label.Value { + targetStores = append(targetStores, store) // send backup push down request to stores that match replica read label + targetStoreIds[store.GetId()] = struct{}{} // record store id for fine grained backup + } + } + } + if len(targetStores) == 0 { + return nil, nil, errors.Errorf("no store matches replica read label: %v", replicaReadLabel) + } + } + + return targetStores, targetStoreIds, nil +} + // BackupRanges make a backup of the given key ranges. func (bc *Client) BackupRanges( ctx context.Context, ranges []rtree.Range, + regionCounts []int, request backuppb.BackupRequest, concurrency uint, replicaReadLabel map[string]string, metaWriter *metautil.MetaWriter, - progressCallBack func(ProgressUnit), + progressCallBack func(), ) error { log.Info("Backup Ranges Started", rtree.ZapRanges(ranges)) init := time.Now() @@ -810,154 +871,120 @@ func (bc *Client) BackupRanges( ctx = opentracing.ContextWithSpan(ctx, span1) } + targetStores, targetStoreIds, err := bc.getBackupTargetStores(ctx, replicaReadLabel) + if err != nil { + return errors.Trace(err) + } + // merge the small ranges, such as index ranges and small partition ranges, into a batch. + targetRangesBatchSize := len(targetStores) * 64 + // we collect all files in a single goroutine to avoid thread safety issues. workerPool := util.NewWorkerPool(concurrency, "Ranges") eg, ectx := errgroup.WithContext(ctx) - for id, r := range ranges { - id := id - req := request - req.StartKey, req.EndKey = r.StartKey, r.EndKey - pr, err := bc.GetProgressRange(r) - if err != nil { - return errors.Trace(err) + rangeInBatchStartIndex := 0 + regionCountInBatch := 0 + // ASSERT: len(ranges) > 0 + for idx := 0; idx <= len(ranges); idx += 1 { + if idx != len(ranges) { + if regionCountInBatch <= targetRangesBatchSize { + regionCountInBatch += regionCounts[idx] + continue + } + regionCountInBatch = regionCounts[idx] } - workerPool.ApplyOnErrorGroup(eg, func() error { - elctx := logutil.ContextWithField(ectx, logutil.RedactAny("range-sn", id)) - err := bc.BackupRange(elctx, req, replicaReadLabel, pr, metaWriter, progressCallBack) + // merge the ranges[rangeInBatchStartIndex, id) into a batch + pranges := bc.getProgressRanges(ranges[rangeInBatchStartIndex:idx]) + idxStart := rangeInBatchStartIndex + idxEnd := idx + req := request + workerPool.ApplyOnErrorGroup(eg, func() (retErr error) { + elctx := logutil.ContextWithField(ectx, + logutil.RedactAny("range-sn-start", idxStart), logutil.RedactAny("range-sn-end", idxEnd)) + + prTree, subRanges, err := buildProgressRangeTree(pranges) if err != nil { - // The error due to context cancel, stack trace is meaningless, the stack shall be suspended (also clear) - if errors.Cause(err) == context.Canceled { - return errors.SuspendStack(err) + return errors.Trace(err) + } + start := time.Now() + defer func() { + minPr, _ := prTree.Min() + maxPr, _ := prTree.Max() + key := "range start: " + + hex.EncodeToString(minPr.Origin.StartKey) + + " end: " + + hex.EncodeToString(maxPr.Origin.EndKey) + logutil.CL(elctx).Info("backup range completed", zap.String("key range", key), zap.Duration("take", time.Since(start))) + if retErr != nil { + summary.CollectFailureUnit(key, err) } + }() + logutil.CL(elctx).Info("backup range started", zap.Uint64("rateLimit", req.RateLimit)) + if err := bc.pushBackupInBatch(elctx, req, prTree, subRanges, targetStores, progressCallBack); err != nil { + return errors.Trace(err) + } + if err := bc.fineGrainedBackup(elctx, req, targetStoreIds, prTree, progressCallBack); err != nil { + return errors.Trace(err) + } + if err := collectRangeFiles(prTree, metaWriter); err != nil { return errors.Trace(err) } return nil }) + + rangeInBatchStartIndex = idx } return eg.Wait() } -// BackupRange make a backup of the given key range. -// Returns an array of files backed up. -func (bc *Client) BackupRange( +func (bc *Client) pushBackupInBatch( ctx context.Context, request backuppb.BackupRequest, - replicaReadLabel map[string]string, - progressRange *rtree.ProgressRange, - metaWriter *metautil.MetaWriter, - progressCallBack func(ProgressUnit), -) (err error) { - start := time.Now() - defer func() { - elapsed := time.Since(start) - logutil.CL(ctx).Info("backup range completed", - logutil.Key("startKey", progressRange.Origin.StartKey), logutil.Key("endKey", progressRange.Origin.EndKey), - zap.Duration("take", elapsed)) - key := "range start:" + hex.EncodeToString(progressRange.Origin.StartKey) + " end:" + hex.EncodeToString(progressRange.Origin.EndKey) - if err != nil { - summary.CollectFailureUnit(key, err) - } - }() - logutil.CL(ctx).Info("backup range started", - logutil.Key("startKey", progressRange.Origin.StartKey), logutil.Key("endKey", progressRange.Origin.EndKey), - zap.Uint64("rateLimit", request.RateLimit)) - - allStores, err := conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash) + prTree rtree.ProgressRangeTree, + subRanges []*kvrpcpb.KeyRange, + targetStores []*metapb.Store, + progressCallBack func(), +) error { + logutil.CL(ctx).Info("backup push down started") + request.SubRanges = subRanges + push := newPushDown(bc.mgr, len(targetStores)) + err := push.pushBackup(ctx, request, prTree, targetStores, bc.checkpointRunner, progressCallBack) if err != nil { return errors.Trace(err) } - var targetStores []*metapb.Store - targetStoreIds := make(map[uint64]struct{}) - if len(replicaReadLabel) == 0 { - targetStores = allStores // send backup push down request to all stores - } else { - for _, store := range allStores { - for _, label := range store.Labels { - if val, ok := replicaReadLabel[label.Key]; ok && val == label.Value { - targetStores = append(targetStores, store) // send backup push down request to stores that match replica read label - targetStoreIds[store.GetId()] = struct{}{} // record store id for fine grained backup - } - } - } - } - if len(replicaReadLabel) > 0 && len(targetStores) == 0 { - return errors.Errorf("no store matches replica read label: %v", replicaReadLabel) - } + logutil.CL(ctx).Info("backup push down completed", zap.Int("range-count", len(subRanges))) + return nil +} - logutil.CL(ctx).Info("backup push down started") - // either the `incomplete` is origin range itself, - // or the `incomplete` is sub-ranges split by checkpoint of origin range - if len(progressRange.Incomplete) > 0 { - // don't make the origin request dirty, - // since fineGrainedBackup need to use it. - req := request - if len(progressRange.Incomplete) > 1 { - subRanges := make([]*kvrpcpb.KeyRange, 0, len(progressRange.Incomplete)) - for _, r := range progressRange.Incomplete { - subRanges = append(subRanges, &kvrpcpb.KeyRange{ - StartKey: r.StartKey, - EndKey: r.EndKey, - }) +func collectRangeFiles(progressRangeTree rtree.ProgressRangeTree, metaWriter *metautil.MetaWriter) error { + var progressRangeAscendErr error + progressRangeTree.Ascend(func(progressRange *rtree.ProgressRange) bool { + var rangeAscendErr error + progressRange.Res.Ascend(func(i btree.Item) bool { + r := i.(*rtree.Range) + for _, f := range r.Files { + summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs) + summary.CollectSuccessUnit(summary.TotalBytes, 1, f.TotalBytes) } - req.SubRanges = subRanges - } else { - // compatible with older version of TiKV - req.StartKey = progressRange.Incomplete[0].StartKey - req.EndKey = progressRange.Incomplete[0].EndKey - } - - push := newPushDown(bc.mgr, len(targetStores)) - err = push.pushBackup(ctx, req, progressRange, targetStores, bc.checkpointRunner, progressCallBack) - if err != nil { - return errors.Trace(err) - } - } - logutil.CL(ctx).Info("backup push down completed", zap.Int("small-range-count", progressRange.Res.Len())) - - // Find and backup remaining ranges. - // TODO: test fine grained backup. - if err := bc.fineGrainedBackup(ctx, request, targetStoreIds, progressRange, progressCallBack); err != nil { - return errors.Trace(err) - } - - // update progress of range unit - progressCallBack(RangeUnit) - - if request.IsRawKv { - logutil.CL(ctx).Info("raw ranges backed up", - logutil.Key("startKey", progressRange.Origin.StartKey), - logutil.Key("endKey", progressRange.Origin.EndKey), - zap.String("cf", request.Cf)) - } else { - logutil.CL(ctx).Info("transactional range backup completed", - zap.Reflect("StartTS", request.StartVersion), - zap.Reflect("EndTS", request.EndVersion)) - } - - var ascendErr error - progressRange.Res.Ascend(func(i btree.Item) bool { - r := i.(*rtree.Range) - for _, f := range r.Files { - summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs) - summary.CollectSuccessUnit(summary.TotalBytes, 1, f.TotalBytes) - } - // we need keep the files in order after we support multi_ingest sst. - // default_sst and write_sst need to be together. - if err := metaWriter.Send(r.Files, metautil.AppendDataFile); err != nil { - ascendErr = err + // we need keep the files in order after we support multi_ingest sst. + // default_sst and write_sst need to be together. + if err := metaWriter.Send(r.Files, metautil.AppendDataFile); err != nil { + rangeAscendErr = err + return false + } + return true + }) + if rangeAscendErr != nil { + progressRangeAscendErr = rangeAscendErr return false } + + // Check if there are duplicated files + checkDupFiles(&progressRange.Res) return true }) - if ascendErr != nil { - return errors.Trace(ascendErr) - } - - // Check if there are duplicated files. - checkDupFiles(&progressRange.Res) - return nil + return errors.Trace(progressRangeAscendErr) } func (bc *Client) FindTargetPeer(ctx context.Context, key []byte, isRawKv bool, targetStoreIds map[uint64]struct{}) (*metapb.Peer, error) { @@ -1058,8 +1085,8 @@ func (bc *Client) fineGrainedBackup( ctx context.Context, req backuppb.BackupRequest, targetStoreIds map[uint64]struct{}, - pr *rtree.ProgressRange, - progressCallBack func(ProgressUnit), + prTree rtree.ProgressRangeTree, + progressCallBack func(), ) error { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("Client.fineGrainedBackup", opentracing.ChildOf(span.Context())) @@ -1083,9 +1110,10 @@ func (bc *Client) fineGrainedBackup( }) bo := utils.AdaptTiKVBackoffer(ctx, backupFineGrainedMaxBackoff, berrors.ErrUnknown) + prIter := prTree.Iter() for { // Step1, check whether there is any incomplete range - incomplete := pr.Res.GetIncompleteRange(req.StartKey, req.EndKey) + incomplete := prIter.GetIncompleteRanges() if len(incomplete) == 0 { return nil } @@ -1145,6 +1173,11 @@ func (bc *Client) fineGrainedBackup( logutil.Key("fine-grained-range-start", resp.StartKey), logutil.Key("fine-grained-range-end", resp.EndKey), ) + pr, err := prTree.FindContained(resp.StartKey, resp.EndKey) + if err != nil { + logutil.CL(ctx).Panic("failed to update the backup response", + zap.Reflect("error", err)) + } if bc.checkpointRunner != nil { if err := checkpoint.AppendForBackup( ctx, @@ -1162,7 +1195,7 @@ func (bc *Client) fineGrainedBackup( bc.SetApiVersion(apiVersion) // Update progress - progressCallBack(RegionUnit) + progressCallBack() } } @@ -1171,7 +1204,7 @@ func (bc *Client) fineGrainedBackup( log.Info("handle fine grained", zap.Int("backoffMs", ms)) err := bo.BackOff() if err != nil { - return errors.Annotatef(err, "at fine-grained backup, remained ranges = %d", pr.Res.Len()) + return errors.Annotatef(err, "at fine-grained backup, remained ranges = %d", prIter.Len()) } } } diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index 47fcfc13e49ac..ea14283a370e1 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -54,10 +54,10 @@ func newPushDown(mgr ClientMgr, capacity int) *pushDown { func (push *pushDown) pushBackup( ctx context.Context, req backuppb.BackupRequest, - pr *rtree.ProgressRange, + prTree rtree.ProgressRangeTree, stores []*metapb.Store, checkpointRunner *checkpoint.CheckpointRunner[checkpoint.BackupKeyType, checkpoint.BackupValueType], - progressCallBack func(ProgressUnit), + progressCallBack func(), ) error { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("pushDown.pushBackup", opentracing.ChildOf(span.Context())) @@ -162,6 +162,10 @@ func (push *pushDown) pushBackup( } }) if resp.GetError() == nil { + pr, err := prTree.FindContained(resp.StartKey, resp.EndKey) + if err != nil { + return errors.Annotate(err, "failed to update the backup response") + } // None error means range has been backuped successfully. if checkpointRunner != nil { if err := checkpoint.AppendForBackup( @@ -180,7 +184,7 @@ func (push *pushDown) pushBackup( resp.GetStartKey(), resp.GetEndKey(), resp.GetFiles()) // Update progress - progressCallBack(RegionUnit) + progressCallBack() } else { errPb := resp.GetError() res := errContext.HandleIgnorableError(errPb, store.GetId()) diff --git a/br/pkg/rtree/BUILD.bazel b/br/pkg/rtree/BUILD.bazel index 4c0e7525952d0..8b4b0bb55cea3 100644 --- a/br/pkg/rtree/BUILD.bazel +++ b/br/pkg/rtree/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "@com_github_google_btree//:btree", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_log//:log", + "@com_github_pkg_errors//:errors", "@org_uber_go_zap//:zap", "@org_uber_go_zap//zapcore", ], @@ -32,7 +33,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 4, + shard_count = 5, deps = [ ":rtree", "//pkg/kv", diff --git a/br/pkg/rtree/rtree.go b/br/pkg/rtree/rtree.go index 231e647b7fbe8..ee4648e47bb9e 100644 --- a/br/pkg/rtree/rtree.go +++ b/br/pkg/rtree/rtree.go @@ -11,6 +11,8 @@ import ( "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/tablecodec" + "github.com/pingcap/tidb/pkg/util/redact" + "github.com/pkg/errors" ) // Range represents a backup response. @@ -69,6 +71,13 @@ func (rg *Range) Contains(key []byte) bool { (len(end) == 0 || bytes.Compare(key, end) < 0) } +// ContainsRange check if the range contains the region's key range. +func (rg *Range) ContainsRange(startKey, endKey []byte) bool { + start, end := rg.StartKey, rg.EndKey + return bytes.Compare(startKey, start) >= 0 && + (len(end) == 0 || bytes.Compare(endKey, end) <= 0) +} + // Less impls btree.Item. func (rg *Range) Less(than btree.Item) bool { // rg.StartKey < than.StartKey @@ -293,3 +302,121 @@ type ProgressRange struct { Origin Range GroupKey string } + +// Less impls btree.Item. +func (pr *ProgressRange) Less(than *ProgressRange) bool { + // pr.StartKey <= than.StartKey + return bytes.Compare(pr.Origin.StartKey, than.Origin.StartKey) < 0 +} + +// ProgressRangeTree is a sorted tree for ProgressRanges. +// All the progress ranges it sorted do not overlap. +type ProgressRangeTree struct { + *btree.BTreeG[*ProgressRange] +} + +// NewProgressRangeTree returns an empty range tree. +func NewProgressRangeTree() ProgressRangeTree { + return ProgressRangeTree{ + BTreeG: btree.NewG[*ProgressRange](32, (*ProgressRange).Less), + } +} + +// find is a helper function to find an item that contains the range. +func (rangeTree *ProgressRangeTree) find(pr *ProgressRange) *ProgressRange { + var ret *ProgressRange + rangeTree.DescendLessOrEqual(pr, func(item *ProgressRange) bool { + ret = item + return false + }) + + if ret == nil || !ret.Origin.Contains(pr.Origin.StartKey) { + return nil + } + + return ret +} + +// Insert inserts a ProgressRange into the tree, it will return an error if there is a overlapping range. +func (rangeTree *ProgressRangeTree) Insert(pr *ProgressRange) error { + overlap := rangeTree.find(pr) + if overlap != nil { + return errors.Errorf("failed to insert the progress range into range tree, "+ + "because there is a overlapping range. The insert item start key: %s; "+ + "The overlapped item start key: %s, end key: %s.", + redact.Key(pr.Origin.StartKey), redact.Key(overlap.Origin.StartKey), redact.Key(overlap.Origin.EndKey)) + } + rangeTree.ReplaceOrInsert(pr) + return nil +} + +// FindContained finds if there is a progress range containing the key range [startKey, endKey). +func (rangeTree *ProgressRangeTree) FindContained(startKey, endKey []byte) (*ProgressRange, error) { + var ret *ProgressRange + rangeTree.Descend(func(pr *ProgressRange) bool { + if bytes.Compare(pr.Origin.StartKey, startKey) <= 0 { + ret = pr + return false + } + return true + }) + + if ret == nil { + return nil, errors.Errorf("Cannot find progress range that contains the start key: %s", redact.Key(startKey)) + } + + if !ret.Origin.ContainsRange(startKey, endKey) { + return nil, errors.Errorf("The given region is not contained in the found progress range. "+ + "The region start key is %s; The progress range start key is %s, end key is %s.", + startKey, redact.Key(ret.Origin.StartKey), redact.Key(ret.Origin.EndKey)) + } + + return ret, nil +} + +type incompleteRangesFetcherItem struct { + pr *ProgressRange + complete bool +} + +type IncompleteRangesFetcher struct { + items []*incompleteRangesFetcherItem + left int +} + +func (rangeTree *ProgressRangeTree) Iter() *IncompleteRangesFetcher { + items := make([]*incompleteRangesFetcherItem, 0, rangeTree.Len()) + rangeTree.Ascend(func(item *ProgressRange) bool { + items = append(items, &incompleteRangesFetcherItem{ + pr: item, + complete: false, + }) + return true + }) + return &IncompleteRangesFetcher{ + items: items, + left: len(items), + } +} + +func (iter *IncompleteRangesFetcher) GetIncompleteRanges() []Range { + incompleteRanges := make([]Range, 0, 64*len(iter.items)) + for _, item := range iter.items { + if item.complete { + continue + } + + incomplete := item.pr.Res.GetIncompleteRange(item.pr.Origin.StartKey, item.pr.Origin.EndKey) + if len(incomplete) == 0 { + item.complete = true + iter.left -= 1 + continue + } + incompleteRanges = append(incompleteRanges, incomplete...) + } + return incompleteRanges +} + +func (iter *IncompleteRangesFetcher) Len() int { + return iter.left +} diff --git a/br/pkg/rtree/rtree_test.go b/br/pkg/rtree/rtree_test.go index 40f5f979972ca..f5b702e04072b 100644 --- a/br/pkg/rtree/rtree_test.go +++ b/br/pkg/rtree/rtree_test.go @@ -220,3 +220,58 @@ func TestRangeTreeMerge(t *testing.T) { } } } + +func buildProgressRange(startKey, endKey string) *rtree.ProgressRange { + pr := &rtree.ProgressRange{ + Res: rtree.NewRangeTree(), + Origin: rtree.Range{ + StartKey: []byte(startKey), + EndKey: []byte(endKey), + }, + } + return pr +} + +func TestProgressRangeTree(t *testing.T) { + prTree := rtree.NewProgressRangeTree() + + require.NoError(t, prTree.Insert(buildProgressRange("aa", "cc"))) + require.Error(t, prTree.Insert(buildProgressRange("bb", "cc"))) + require.Error(t, prTree.Insert(buildProgressRange("bb", "dd"))) + require.NoError(t, prTree.Insert(buildProgressRange("cc", "dd"))) + require.NoError(t, prTree.Insert(buildProgressRange("ee", "ff"))) + + prIter := prTree.Iter() + ranges := prIter.GetIncompleteRanges() + require.Equal(t, rtree.Range{StartKey: []byte("aa"), EndKey: []byte("cc")}, ranges[0]) + require.Equal(t, rtree.Range{StartKey: []byte("cc"), EndKey: []byte("dd")}, ranges[1]) + require.Equal(t, rtree.Range{StartKey: []byte("ee"), EndKey: []byte("ff")}, ranges[2]) + + pr, err := prTree.FindContained([]byte("aaa"), []byte("b")) + require.NoError(t, err) + pr.Res.Put([]byte("aaa"), []byte("b"), nil) + + pr, err = prTree.FindContained([]byte("cc"), []byte("dd")) + require.NoError(t, err) + pr.Res.Put([]byte("cc"), []byte("dd"), nil) + + ranges = prIter.GetIncompleteRanges() + require.Equal(t, rtree.Range{StartKey: []byte("aa"), EndKey: []byte("aaa")}, ranges[0]) + require.Equal(t, rtree.Range{StartKey: []byte("b"), EndKey: []byte("cc")}, ranges[1]) + require.Equal(t, rtree.Range{StartKey: []byte("ee"), EndKey: []byte("ff")}, ranges[2]) + + pr, err = prTree.FindContained([]byte("aa"), []byte("aaa")) + require.NoError(t, err) + pr.Res.Put([]byte("aa"), []byte("aaa"), nil) + + pr, err = prTree.FindContained([]byte("b"), []byte("cc")) + require.NoError(t, err) + pr.Res.Put([]byte("b"), []byte("cc"), nil) + + pr, err = prTree.FindContained([]byte("ee"), []byte("ff")) + require.NoError(t, err) + pr.Res.Put([]byte("ee"), []byte("ff"), nil) + + ranges = prIter.GetIncompleteRanges() + require.Equal(t, 0, len(ranges)) +} diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index 915bdb2092bd9..3376b67fb6b90 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/metautil" + "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" @@ -618,51 +619,33 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig summary.CollectInt("backup total ranges", len(ranges)) - var updateCh glue.Progress - var unit backup.ProgressUnit - if len(ranges) < 100 { - unit = backup.RegionUnit - // The number of regions need to backup - approximateRegions := 0 - for _, r := range ranges { - var regionCount int - regionCount, err = mgr.GetRegionCount(ctx, r.StartKey, r.EndKey) - if err != nil { - return errors.Trace(err) - } - approximateRegions += regionCount - } - // Redirect to log if there is no log file to avoid unreadable output. - updateCh = g.StartProgress( - ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) - summary.CollectInt("backup total regions", approximateRegions) - } else { - unit = backup.RangeUnit - // To reduce the costs, we can use the range as unit of progress. - updateCh = g.StartProgress( - ctx, cmdName, int64(len(ranges)), !cfg.LogProgress) + approximateRegions, regionCounts, err := getRegionCountOfRanges(ctx, mgr, ranges) + if err != nil { + return errors.Trace(err) } + // Redirect to log if there is no log file to avoid unreadable output. + updateCh := g.StartProgress( + ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) + summary.CollectInt("backup total regions", approximateRegions) progressCount := uint64(0) - progressCallBack := func(callBackUnit backup.ProgressUnit) { - if unit == callBackUnit { - updateCh.Inc() + progressCallBack := func() { + updateCh.Inc() + failpoint.Inject("progress-call-back", func(v failpoint.Value) { + log.Info("failpoint progress-call-back injected") atomic.AddUint64(&progressCount, 1) - failpoint.Inject("progress-call-back", func(v failpoint.Value) { - log.Info("failpoint progress-call-back injected") - if fileName, ok := v.(string); ok { - f, osErr := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModePerm) - if osErr != nil { - log.Warn("failed to create file", zap.Error(osErr)) - } - msg := []byte(fmt.Sprintf("%s:%d\n", unit, atomic.LoadUint64(&progressCount))) - _, err = f.Write(msg) - if err != nil { - log.Warn("failed to write data to file", zap.Error(err)) - } + if fileName, ok := v.(string); ok { + f, osErr := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModePerm) + if osErr != nil { + log.Warn("failed to create file", zap.Error(osErr)) } - }) - } + msg := []byte(fmt.Sprintf("region:%d\n", atomic.LoadUint64(&progressCount))) + _, err = f.Write(msg) + if err != nil { + log.Warn("failed to write data to file", zap.Error(err)) + } + } + }) } if cfg.UseCheckpoint { @@ -701,7 +684,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig }) metawriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile) - err = client.BackupRanges(ctx, ranges, req, uint(cfg.Concurrency), cfg.ReplicaReadLabel, metawriter, progressCallBack) + err = client.BackupRanges(ctx, ranges, regionCounts, req, uint(cfg.Concurrency), cfg.ReplicaReadLabel, metawriter, progressCallBack) if err != nil { return errors.Trace(err) } @@ -761,6 +744,25 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return nil } +func getRegionCountOfRanges( + ctx context.Context, + mgr *conn.Mgr, + ranges []rtree.Range, +) (int, []int, error) { + // The number of regions need to backup + approximateRegions := 0 + // The number array of regions of ranges, thecounts[i] is the number of regions of the range[i]. + counts := make([]int, 0, len(ranges)) + for _, r := range ranges { + regionCount, err := mgr.GetRegionCount(ctx, r.StartKey, r.EndKey) + if err != nil { + return 0, nil, errors.Trace(err) + } + counts = append(counts, regionCount) + } + return approximateRegions, counts, nil +} + // ParseTSString port from tidb setSnapshotTS. func ParseTSString(ts string, tzCheck bool) (uint64, error) { if len(ts) == 0 { diff --git a/br/pkg/task/backup_raw.go b/br/pkg/task/backup_raw.go index ed7248fd21bf4..34d44073b5153 100644 --- a/br/pkg/task/backup_raw.go +++ b/br/pkg/task/backup_raw.go @@ -191,10 +191,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf updateCh := g.StartProgress( ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) - progressCallBack := func(unit backup.ProgressUnit) { - if unit == backup.RangeUnit { - return - } + progressCallBack := func() { updateCh.Inc() } @@ -217,14 +214,9 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf StartKey: backupRange.StartKey, EndKey: backupRange.EndKey, } - progressRange := &rtree.ProgressRange{ - Res: rtree.NewRangeTree(), - Incomplete: []rtree.Range{rg}, - Origin: rg, - } metaWriter := metautil.NewMetaWriter(client.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, &cfg.CipherInfo) metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile) - err = client.BackupRange(ctx, req, map[string]string{}, progressRange, metaWriter, progressCallBack) + err = client.BackupRanges(ctx, []rtree.Range{rg}, []int{approximateRegions}, req, 1, nil, metaWriter, progressCallBack) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/backup_txn.go b/br/pkg/task/backup_txn.go index d8b1d1d72591c..796330a0792a7 100644 --- a/br/pkg/task/backup_txn.go +++ b/br/pkg/task/backup_txn.go @@ -169,6 +169,7 @@ func RunBackupTxn(c context.Context, g glue.Glue, cmdName string, cfg *TxnKvConf if err != nil { return errors.Trace(err) } + regionCounts := []int{approximateRegions} summary.CollectInt("backup total regions", approximateRegions) @@ -177,10 +178,7 @@ func RunBackupTxn(c context.Context, g glue.Glue, cmdName string, cfg *TxnKvConf updateCh := g.StartProgress( ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) - progressCallBack := func(unit backup.ProgressUnit) { - if unit == backup.RangeUnit { - return - } + progressCallBack := func() { updateCh.Inc() } backupTS, err := client.GetCurrentTS(ctx) @@ -204,7 +202,7 @@ func RunBackupTxn(c context.Context, g glue.Glue, cmdName string, cfg *TxnKvConf metaWriter := metautil.NewMetaWriter(client.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, &cfg.CipherInfo) metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile) - err = client.BackupRanges(ctx, backupRanges, req, uint(cfg.Concurrency), nil, metaWriter, progressCallBack) + err = client.BackupRanges(ctx, backupRanges, regionCounts, req, 1, nil, metaWriter, progressCallBack) if err != nil { return errors.Trace(err) } diff --git a/br/tests/br_300_small_tables/run.sh b/br/tests/br_300_small_tables/run.sh index ca257fbce9b5e..bff3f80327df6 100644 --- a/br/tests/br_300_small_tables/run.sh +++ b/br/tests/br_300_small_tables/run.sh @@ -47,7 +47,7 @@ backupv2_size=`grep "backup-data-size" "${BACKUPMETAV2_LOG}" | grep -oP '\[\K[^\ echo "backup meta v2 backup size is ${backupv2_size}" export GO_FAILPOINTS="" -if [[ "$(wc -l <$PROGRESS_FILE)" == "1" ]] && [[ $(grep -c "range" $PROGRESS_FILE) == "1" ]]; +if [[ "$(wc -l <$PROGRESS_FILE)" == "1" ]] && [[ $(grep -c "region" $PROGRESS_FILE) == "1" ]]; then echo "use the correct progress unit" else diff --git a/br/tests/br_tikv_outage2/run.sh b/br/tests/br_tikv_outage2/run.sh index e27b683e248d8..521b7942203ec 100644 --- a/br/tests/br_tikv_outage2/run.sh +++ b/br/tests/br_tikv_outage2/run.sh @@ -25,7 +25,7 @@ github.com/pingcap/tidb/br/pkg/utils/hint-get-backup-client=1*return(\"$hint_get backup_dir=${TEST_DIR:?}/"backup{test:${TEST_NAME}|with:${failure}}" rm -rf "${backup_dir:?}" - run_br backup full -s local://"$backup_dir" --concurrency 1 --ratelimit 3 & + run_br backup full --skip-goleak -s local://"$backup_dir" --concurrency 1 --ratelimit 3 & backup_pid=$! single_point_fault $failure wait $backup_pid