From 9cb4c488fc180102408819f82a262906410fbd6c Mon Sep 17 00:00:00 2001 From: Yang Zhang Date: Thu, 9 Feb 2023 02:06:08 -0800 Subject: [PATCH] br: Support backup replica read (#40899) ref pingcap/tidb#40898 --- br/pkg/backup/client.go | 78 ++++++++++++---- br/pkg/task/backup.go | 43 +++++++-- br/pkg/task/backup_raw.go | 2 +- .../placement_rule_with_learner_template.json | 25 ++++++ br/tests/br_replica_read/run.sh | 89 +++++++++++++++++++ 5 files changed, 208 insertions(+), 29 deletions(-) create mode 100644 br/tests/br_replica_read/placement_rule_with_learner_template.json create mode 100755 br/tests/br_replica_read/run.sh diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 49bba26612578..ce5a0d373924e 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -10,6 +10,7 @@ import ( "encoding/json" "fmt" "io" + "math/rand" "os" "strings" "sync" @@ -758,6 +759,7 @@ func (bc *Client) BackupRanges( ranges []rtree.Range, request backuppb.BackupRequest, concurrency uint, + replicaReadLabel map[string]string, metaWriter *metautil.MetaWriter, progressCallBack func(ProgressUnit), ) error { @@ -787,7 +789,7 @@ func (bc *Client) BackupRanges( } workerPool.ApplyOnErrorGroup(eg, func() error { elctx := logutil.ContextWithField(ectx, logutil.RedactAny("range-sn", id)) - err := bc.BackupRange(elctx, req, pr, metaWriter, progressCallBack) + err := bc.BackupRange(elctx, req, replicaReadLabel, pr, metaWriter, progressCallBack) if err != nil { // The error due to context cancel, stack trace is meaningless, the stack shall be suspended (also clear) if errors.Cause(err) == context.Canceled { @@ -807,6 +809,7 @@ func (bc *Client) BackupRanges( func (bc *Client) BackupRange( ctx context.Context, request backuppb.BackupRequest, + replicaReadLabel map[string]string, progressRange *rtree.ProgressRange, metaWriter *metautil.MetaWriter, progressCallBack func(ProgressUnit), @@ -827,11 +830,27 @@ func (bc *Client) BackupRange( zap.Uint64("rateLimit", request.RateLimit), zap.Uint32("concurrency", request.Concurrency)) - var allStores []*metapb.Store - allStores, err = conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash) + allStores, err := conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash) if err != nil { return errors.Trace(err) } + var targetStores []*metapb.Store + targetStoreIds := make(map[uint64]struct{}) + if len(replicaReadLabel) == 0 { + targetStores = allStores // send backup push down request to all stores + } else { + for _, store := range allStores { + for _, label := range store.Labels { + if val, ok := replicaReadLabel[label.Key]; !ok && val == label.Value { + targetStores = append(targetStores, store) // send backup push down request to stores that match replica read label + targetStoreIds[store.GetId()] = struct{}{} // record store id for fine grained backup + } + } + } + } + if len(replicaReadLabel) > 0 && len(targetStores) == 0 { + return errors.Errorf("no store matches replica read label: %v", replicaReadLabel) + } logutil.CL(ctx).Info("backup push down started") // either the `incomplete` is origin range itself, @@ -855,8 +874,8 @@ func (bc *Client) BackupRange( req.EndKey = progressRange.Incomplete[0].EndKey } - push := newPushDown(bc.mgr, len(allStores)) - err = push.pushBackup(ctx, req, progressRange, allStores, bc.checkpointRunner, progressCallBack) + push := newPushDown(bc.mgr, len(targetStores)) + err = push.pushBackup(ctx, req, progressRange, targetStores, bc.checkpointRunner, progressCallBack) if err != nil { return errors.Trace(err) } @@ -865,7 +884,7 @@ func (bc *Client) BackupRange( // Find and backup remaining ranges. // TODO: test fine grained backup. - if err := bc.fineGrainedBackup(ctx, request, progressRange, progressCallBack); err != nil { + if err := bc.fineGrainedBackup(ctx, request, targetStoreIds, progressRange, progressCallBack); err != nil { return errors.Trace(err) } @@ -908,7 +927,7 @@ func (bc *Client) BackupRange( return nil } -func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool) (*metapb.Peer, error) { +func (bc *Client) findTargetPeer(ctx context.Context, key []byte, isRawKv bool, targetStoreIds map[uint64]struct{}) (*metapb.Peer, error) { // Keys are saved in encoded format in TiKV, so the key must be encoded // in order to find the correct region. key = codec.EncodeBytesExt([]byte{}, key, isRawKv) @@ -916,26 +935,46 @@ func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool // better backoff. region, err := bc.mgr.GetPDClient().GetRegion(ctx, key) if err != nil || region == nil { - log.Error("find leader failed", zap.Error(err), zap.Reflect("region", region)) + log.Error("find region failed", zap.Error(err), zap.Reflect("region", region)) time.Sleep(time.Millisecond * time.Duration(100*i)) continue } - if region.Leader != nil { - log.Info("find leader", - zap.Reflect("Leader", region.Leader), logutil.Key("key", key)) - return region.Leader, nil + if len(targetStoreIds) == 0 { + if region.Leader != nil { + log.Info("find leader", + zap.Reflect("Leader", region.Leader), logutil.Key("key", key)) + return region.Leader, nil + } + } else { + candidates := make([]*metapb.Peer, 0, len(region.Meta.Peers)) + for _, peer := range region.Meta.Peers { + if _, ok := targetStoreIds[peer.StoreId]; ok { + candidates = append(candidates, peer) + } + } + if len(candidates) > 0 { + peer := candidates[rand.Intn(len(candidates))] + log.Info("find target peer for backup", + zap.Reflect("Peer", peer), logutil.Key("key", key)) + return peer, nil + } } - log.Warn("no region found", logutil.Key("key", key)) - time.Sleep(time.Millisecond * time.Duration(100*i)) + + log.Warn("fail to find a target peer", logutil.Key("key", key)) + time.Sleep(time.Millisecond * time.Duration(1000*i)) continue } - log.Error("can not find leader", logutil.Key("key", key)) - return nil, errors.Annotatef(berrors.ErrBackupNoLeader, "can not find leader") + log.Error("can not find a valid target peer", logutil.Key("key", key)) + if len(targetStoreIds) == 0 { + return nil, errors.Annotatef(berrors.ErrBackupNoLeader, "can not find a valid leader for key %s", key) + } + return nil, errors.Errorf("can not find a valid target peer for key %s", key) } func (bc *Client) fineGrainedBackup( ctx context.Context, req backuppb.BackupRequest, + targetStoreIds map[uint64]struct{}, pr *rtree.ProgressRange, progressCallBack func(ProgressUnit), ) error { @@ -986,7 +1025,7 @@ func (bc *Client) fineGrainedBackup( for rg := range retry { subReq := req subReq.StartKey, subReq.EndKey = rg.StartKey, rg.EndKey - backoffMs, err := bc.handleFineGrained(ctx, boFork, subReq, respCh) + backoffMs, err := bc.handleFineGrained(ctx, boFork, subReq, targetStoreIds, respCh) if err != nil { errCh <- err return @@ -1138,13 +1177,14 @@ func (bc *Client) handleFineGrained( ctx context.Context, bo *tikv.Backoffer, req backuppb.BackupRequest, + targetStoreIds map[uint64]struct{}, respCh chan<- *backuppb.BackupResponse, ) (int, error) { - leader, pderr := bc.findRegionLeader(ctx, req.StartKey, req.IsRawKv) + targetPeer, pderr := bc.findTargetPeer(ctx, req.StartKey, req.IsRawKv, targetStoreIds) if pderr != nil { return 0, errors.Trace(pderr) } - storeID := leader.GetStoreId() + storeID := targetPeer.GetStoreId() lockResolver := bc.mgr.GetLockResolver() client, err := bc.mgr.GetBackupClient(ctx, storeID) if err != nil { diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index ded764aaae2b1..5ddaf0e6ccd25 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -51,6 +51,7 @@ const ( flagUseBackupMetaV2 = "use-backupmeta-v2" flagUseCheckpoint = "use-checkpoint" flagKeyspaceName = "keyspace-name" + flagReplicaReadLabel = "replica-read-label" flagGCTTL = "gcttl" @@ -77,14 +78,15 @@ type CompressionConfig struct { type BackupConfig struct { Config - TimeAgo time.Duration `json:"time-ago" toml:"time-ago"` - BackupTS uint64 `json:"backup-ts" toml:"backup-ts"` - LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"` - GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"` - RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"` - IgnoreStats bool `json:"ignore-stats" toml:"ignore-stats"` - UseBackupMetaV2 bool `json:"use-backupmeta-v2"` - UseCheckpoint bool `json:"use-checkpoint" toml:"use-checkpoint"` + TimeAgo time.Duration `json:"time-ago" toml:"time-ago"` + BackupTS uint64 `json:"backup-ts" toml:"backup-ts"` + LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"` + GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"` + RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"` + IgnoreStats bool `json:"ignore-stats" toml:"ignore-stats"` + UseBackupMetaV2 bool `json:"use-backupmeta-v2"` + UseCheckpoint bool `json:"use-checkpoint" toml:"use-checkpoint"` + ReplicaReadLabel map[string]string `json:"replica-read-label" toml:"replica-read-label"` CompressionConfig // for ebs-based backup @@ -139,6 +141,8 @@ func DefineBackupFlags(flags *pflag.FlagSet) { flags.Bool(flagUseCheckpoint, true, "use checkpoint mode") _ = flags.MarkHidden(flagUseCheckpoint) + + flags.String(flagReplicaReadLabel, "", "specify the label of the stores to be used for backup, e.g. 'label_key:label_value'") } // ParseFromFlags parses the backup-related flags from the flag set. @@ -243,6 +247,11 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { } } + cfg.ReplicaReadLabel, err = parseReplicaReadLabelFlag(flags) + if err != nil { + return errors.Trace(err) + } + return nil } @@ -485,6 +494,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig CompressionType: cfg.CompressionType, CompressionLevel: cfg.CompressionLevel, CipherInfo: &cfg.CipherInfo, + ReplicaRead: len(cfg.ReplicaReadLabel) != 0, } brVersion := g.GetVersion() clusterVersion, err := mgr.GetClusterVersion(ctx) @@ -619,7 +629,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig }() } metawriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile) - err = client.BackupRanges(ctx, ranges, req, uint(cfg.Concurrency), metawriter, progressCallBack) + err = client.BackupRanges(ctx, ranges, req, uint(cfg.Concurrency), cfg.ReplicaReadLabel, metawriter, progressCallBack) if err != nil { return errors.Trace(err) } @@ -737,3 +747,18 @@ func parseCompressionType(s string) (backuppb.CompressionType, error) { } return ct, nil } + +func parseReplicaReadLabelFlag(flags *pflag.FlagSet) (map[string]string, error) { + replicaReadLabelStr, err := flags.GetString(flagReplicaReadLabel) + if err != nil { + return nil, errors.Trace(err) + } + if replicaReadLabelStr == "" { + return nil, nil + } + kv := strings.Split(replicaReadLabelStr, ":") + if len(kv) != 2 { + return nil, errors.Annotatef(berrors.ErrInvalidArgument, "invalid replica read label '%s'", replicaReadLabelStr) + } + return map[string]string{kv[0]: kv[1]}, nil +} diff --git a/br/pkg/task/backup_raw.go b/br/pkg/task/backup_raw.go index 2b46347327501..ed7248fd21bf4 100644 --- a/br/pkg/task/backup_raw.go +++ b/br/pkg/task/backup_raw.go @@ -224,7 +224,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf } metaWriter := metautil.NewMetaWriter(client.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, &cfg.CipherInfo) metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile) - err = client.BackupRange(ctx, req, progressRange, metaWriter, progressCallBack) + err = client.BackupRange(ctx, req, map[string]string{}, progressRange, metaWriter, progressCallBack) if err != nil { return errors.Trace(err) } diff --git a/br/tests/br_replica_read/placement_rule_with_learner_template.json b/br/tests/br_replica_read/placement_rule_with_learner_template.json new file mode 100644 index 0000000000000..4c0caa5820f27 --- /dev/null +++ b/br/tests/br_replica_read/placement_rule_with_learner_template.json @@ -0,0 +1,25 @@ +[ + { + "group_id": "pd", + "group_index": 0, + "group_override": false, + "rules": [ + { + "group_id": "pd", + "id": "default", + "start_key": "", + "end_key": "", + "role": "voter", + "count": 2 + }, + { + "group_id": "pd", + "id": "learner", + "start_key": "", + "end_key": "", + "role": "learner", + "count": 1 + } + ] + } +] \ No newline at end of file diff --git a/br/tests/br_replica_read/run.sh b/br/tests/br_replica_read/run.sh new file mode 100755 index 0000000000000..c56f54b4fbef8 --- /dev/null +++ b/br/tests/br_replica_read/run.sh @@ -0,0 +1,89 @@ +#!/bin/sh +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" + +VOTER_COUNT=$((TIKV_COUNT-1)) +if [ "$VOTER_COUNT" -lt "1" ];then + echo "Skip test because there is no enough tikv" + exit 0 +fi + +# set random store to read only +random_store_id=$(run_pd_ctl -u https://$PD_ADDR store | jq 'first(.stores[]|select(.store.labels|(.!= null and any(.key == "engine" and .value=="tiflash"))| not)|.store.id)') +echo "random store id: $random_store_id" +run_pd_ctl -u https://$PD_ADDR store label $random_store_id '$mode' 'read_only' + +# set placement rule to add a learner replica for each region in the read only store +run_pd_ctl -u https://$PD_ADDR config placement-rules rule-bundle load --out=$TEST_DIR/default_rules.json +cat tests/br_replica_read/placement_rule_with_learner_template.json | jq ".[].rules[0].count = $VOTER_COUNT" > $TEST_DIR/placement_rule_with_learner.json +run_pd_ctl -u https://$PD_ADDR config placement-rules rule-bundle save --in $TEST_DIR/placement_rule_with_learner.json +sleep 3 # wait for PD to apply the placement rule + +run_sql "CREATE DATABASE $DB;" + +run_sql "CREATE TABLE $DB.usertable1 ( \ + YCSB_KEY varchar(64) NOT NULL, \ + FIELD0 varchar(1) DEFAULT NULL, \ + PRIMARY KEY (YCSB_KEY) \ +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;" + +run_sql "INSERT INTO $DB.usertable1 VALUES (\"a\", \"b\");" +run_sql "INSERT INTO $DB.usertable1 VALUES (\"aa\", \"b\");" + +run_sql "CREATE TABLE $DB.usertable2 ( \ + YCSB_KEY varchar(64) NOT NULL, \ + FIELD0 varchar(1) DEFAULT NULL, \ + PRIMARY KEY (YCSB_KEY) \ +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;" + +run_sql "INSERT INTO $DB.usertable2 VALUES (\"c\", \"d\");" + +# backup db +echo "backup start..." +run_br -u https://$PD_ADDR backup db --db "$DB" -s "local://$TEST_DIR/$DB" --replica-read-label '$mode:read_only' + +run_sql "DROP DATABASE $DB;" + +# restore db +echo "restore start..." +run_br restore db --db $DB -s "local://$TEST_DIR/$DB" -u https://$PD_ADDR + +run_sql "select count(*) from $DB.usertable1;" +table1_count=$(read_result) +echo "table1 count: $table1_count" +if [ "$table1_count" -ne "2" ];then + echo "TEST: [$TEST_NAME] failed!" + exit 1 +fi + +run_sql "select count(*) from $DB.usertable2;" +table2_count=$(read_result) +echo "table2 count: $table2_count" +if [ "$table2_count" -ne "1" ];then + echo "TEST: [$TEST_NAME] failed!" + exit 1 +fi + +# Test BR DDL query string +echo "testing DDL query..." +run_curl https://$TIDB_STATUS_ADDR/ddl/history | grep -E '/\*from\(br\)\*/CREATE TABLE' +run_curl https://$TIDB_STATUS_ADDR/ddl/history | grep -E '/\*from\(br\)\*/CREATE DATABASE' + +run_sql "DROP DATABASE $DB;" +run_pd_ctl -u https://$PD_ADDR store label $random_store_id '$mode' '' +run_pd_ctl -u https://$PD_ADDR config placement-rules rule-bundle save --in $TEST_DIR/default_rules.json \ No newline at end of file