From 6e522fbbc396a9c65d56191c323edc41267383b8 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Fri, 2 Sep 2022 14:16:20 +0800 Subject: [PATCH 1/4] address comment --- br/pkg/config/ebs.go | 2 +- br/pkg/restore/data.go | 83 +++++++++++++++++++++--------------------- 2 files changed, 42 insertions(+), 43 deletions(-) diff --git a/br/pkg/config/ebs.go b/br/pkg/config/ebs.go index 3f9d55f541327..a92c10c73fcff 100644 --- a/br/pkg/config/ebs.go +++ b/br/pkg/config/ebs.go @@ -200,4 +200,4 @@ func (c *EBSBasedBRMeta) SetVolumeAZs(idMap map[string]string) { volume.VolumeAZ = idMap[volume.ID] } } -} \ No newline at end of file +} diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index 96b0f47631180..e69201ae5df51 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -103,7 +103,7 @@ func NewRecovery(allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progres progress: progress} } -func (recovery *Recovery) newTiKVRecoveryClient(ctx context.Context, tikvAddr string) (recovpb.RecoverDataClient, *grpc.ClientConn, error) { +func (recovery *Recovery) newRecoveryClient(ctx context.Context, storeAddr string) (recovpb.RecoverDataClient, *grpc.ClientConn, error) { // Connect to the Recovery service on the given TiKV node. bfConf := backoff.DefaultConfig bfConf.MaxDelay = gRPCBackOffMaxDelay @@ -115,7 +115,7 @@ func (recovery *Recovery) newTiKVRecoveryClient(ctx context.Context, tikvAddr st //keepaliveConf keepalive.ClientParameters conn, err := grpc.DialContext( ctx, - tikvAddr, + storeAddr, opt, grpc.WithBlock(), grpc.FailOnNonTempDialError(true), @@ -148,7 +148,7 @@ func getStoreAddress(allStores []*metapb.Store, storeId uint64) string { return addr } -// read all region meta from tikvs +// ReadRegionMeta read all region meta from tikvs func (recovery *Recovery) ReadRegionMeta(ctx context.Context) error { eg, ectx := errgroup.WithContext(ctx) totalStores := len(recovery.allStores) @@ -167,17 +167,16 @@ func (recovery *Recovery) ReadRegionMeta(ctx context.Context) error { return errors.Trace(err) } - tikvClient, conn, err := recovery.newTiKVRecoveryClient(ectx, storeAddr) - if err != nil { - return errors.Trace(err) - } - defer conn.Close() - workers.ApplyOnErrorGroup(eg, func() error { + recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr) + if err != nil { + return errors.Trace(err) + } + defer conn.Close() log.Info("read meta from tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeId)) - stream, err := tikvClient.ReadRegionMeta(ectx, &recovpb.ReadRegionMetaRequest{StoreId: storeId}) + stream, err := recoveryClient.ReadRegionMeta(ectx, &recovpb.ReadRegionMetaRequest{StoreId: storeId}) if err != nil { - log.Error("read region meta failed", zap.Uint64("storeID", storeId)) + log.Error("read region meta failed", zap.Uint64("store id", storeId)) return errors.Trace(err) } @@ -228,7 +227,7 @@ func (recovery *Recovery) getTotalRegions() int { return len(regions) } -// send the recovery plan to recovery region (force leader etc) +// RecoverRegions send the recovery plan to recovery region (force leader etc) // only tikvs have regions whose have to recover be sent func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) { eg, ectx := errgroup.WithContext(ctx) @@ -241,24 +240,24 @@ func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) { } storeAddr := getStoreAddress(recovery.allStores, storeId) - tikvClient, conn, err := recovery.newTiKVRecoveryClient(ectx, storeAddr) - if err != nil { - log.Error("create tikv client failed", zap.Uint64("storeID", storeId)) - return errors.Trace(err) - } - defer conn.Close() - cmd := plan - storeId := storeId + recoveryPlan := plan + recoveryStoreId := storeId workers.ApplyOnErrorGroup(eg, func() error { - log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeId)) - stream, err := tikvClient.RecoverRegion(ectx) + recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr) + if err != nil { + log.Error("create tikv client failed", zap.Uint64("store id", recoveryStoreId)) + return errors.Trace(err) + } + defer conn.Close() + log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", recoveryStoreId)) + stream, err := recoveryClient.RecoverRegion(ectx) if err != nil { - log.Error("create recover region failed", zap.Uint64("storeID", storeId)) + log.Error("create recover region failed", zap.Uint64("store id", recoveryStoreId)) return errors.Trace(err) } // for a TiKV, send the stream - for _, s := range cmd { + for _, s := range recoveryPlan { if err = stream.Send(s); err != nil { log.Error("send region recovery region failed", zap.Error(err)) return errors.Trace(err) @@ -271,7 +270,7 @@ func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) { return errors.Trace(err) } recovery.progress.Inc() - log.Info("recovery region execution success", zap.Uint64("storeID", reply.GetStoreId())) + log.Info("recovery region execution success", zap.Uint64("store id", reply.GetStoreId())) return nil }) } @@ -279,7 +278,7 @@ func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) { return eg.Wait() } -// send wait apply to all tikv ensure all region peer apply log into the last +// WaitApply send wait apply to all tikv ensure all region peer apply log into the last func (recovery *Recovery) WaitApply(ctx context.Context) (err error) { eg, ectx := errgroup.WithContext(ctx) totalStores := len(recovery.allStores) @@ -290,24 +289,24 @@ func (recovery *Recovery) WaitApply(ctx context.Context) (err error) { return errors.Trace(err) } storeAddr := getStoreAddress(recovery.allStores, store.Id) - tikvClient, conn, err := recovery.newTiKVRecoveryClient(ectx, storeAddr) - if err != nil { - return errors.Trace(err) - } - defer conn.Close() storeId := store.Id workers.ApplyOnErrorGroup(eg, func() error { + recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr) + if err != nil { + return errors.Trace(err) + } + defer conn.Close() log.Info("send wait apply to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeId)) req := &recovpb.WaitApplyRequest{StoreId: storeId} - _, err := tikvClient.WaitApply(ectx, req) + _, err = recoveryClient.WaitApply(ectx, req) if err != nil { - log.Error("wait apply failed", zap.Uint64("storeID", storeId)) + log.Error("wait apply failed", zap.Uint64("store id", storeId)) return errors.Trace(err) } recovery.progress.Inc() - log.Info("recovery wait apply execution success", zap.Uint64("storeID", storeId)) + log.Info("recovery wait apply execution success", zap.Uint64("store id", storeId)) return nil }) } @@ -315,7 +314,7 @@ func (recovery *Recovery) WaitApply(ctx context.Context) (err error) { return eg.Wait() } -// a worker pool to all tikv for execute delete all data whose has ts > resolvedTs +// ResolveData a worker pool to all tikv for execute delete all data whose has ts > resolvedTs func (recovery *Recovery) ResolveData(ctx context.Context, resolvedTs uint64) (err error) { eg, ectx := errgroup.WithContext(ctx) @@ -328,18 +327,18 @@ func (recovery *Recovery) ResolveData(ctx context.Context, resolvedTs uint64) (e return errors.Trace(err) } storeAddr := getStoreAddress(recovery.allStores, store.Id) - tikvClient, conn, err := recovery.newTiKVRecoveryClient(ectx, storeAddr) - if err != nil { - return errors.Trace(err) - } - defer conn.Close() storeId := store.Id workers.ApplyOnErrorGroup(eg, func() error { + recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr) + if err != nil { + return errors.Trace(err) + } + defer conn.Close() log.Info("resolved data to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeId)) req := &recovpb.ResolveKvDataRequest{ResolvedTs: resolvedTs} - stream, err := tikvClient.ResolveKvData(ectx, req) + stream, err := recoveryClient.ResolveKvData(ectx, req) if err != nil { - log.Error("send the resolve kv data failed", zap.Uint64("storeID", storeId)) + log.Error("send the resolve kv data failed", zap.Uint64("store id", storeId)) return errors.Trace(err) } // for a TiKV, received the stream From b3131f9c02e1374d30273e8d1b9f6449eb885689 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Fri, 2 Sep 2022 14:18:56 +0800 Subject: [PATCH 2/4] address comment --- br/pkg/restore/data.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index e69201ae5df51..24cd87f38f333 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -28,10 +28,10 @@ import ( // in future, num of tikv may extend to a large number, this is limitation of connection pool to tikv // per our knowledge, in present, 128 may a good enough. const ( - max_store_concurency = 128 + maxStoreConcurrency = 128 ) -// recover the tikv cluster +// RecoverData recover the tikv cluster // 1. read all meta data from tikvs // 2. make recovery plan and then recovery max allocate ID firstly // 3. send the recover plan and the wait tikv to apply, in waitapply, all assigned region leader will check apply log to the last log @@ -152,7 +152,7 @@ func getStoreAddress(allStores []*metapb.Store, storeId uint64) string { func (recovery *Recovery) ReadRegionMeta(ctx context.Context) error { eg, ectx := errgroup.WithContext(ctx) totalStores := len(recovery.allStores) - workers := utils.NewWorkerPool(uint(mathutil.Min(totalStores, max_store_concurency)), "Collect Region Meta") // TODO: int overflow? + workers := utils.NewWorkerPool(uint(mathutil.Min(totalStores, maxStoreConcurrency)), "Collect Region Meta") // TODO: int overflow? // TODO: optimize the ErroGroup when TiKV is panic metaChan := make(chan StoreMeta, 1024) @@ -232,7 +232,7 @@ func (recovery *Recovery) getTotalRegions() int { func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) { eg, ectx := errgroup.WithContext(ctx) totalRecoveredStores := len(recovery.recoveryPlan) - workers := utils.NewWorkerPool(uint(mathutil.Min(totalRecoveredStores, max_store_concurency)), "Recover Regions") + workers := utils.NewWorkerPool(uint(mathutil.Min(totalRecoveredStores, maxStoreConcurrency)), "Recover Regions") for storeId, plan := range recovery.recoveryPlan { if err := ectx.Err(); err != nil { @@ -282,7 +282,7 @@ func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) { func (recovery *Recovery) WaitApply(ctx context.Context) (err error) { eg, ectx := errgroup.WithContext(ctx) totalStores := len(recovery.allStores) - workers := utils.NewWorkerPool(uint(mathutil.Min(totalStores, max_store_concurency)), "wait apply") + workers := utils.NewWorkerPool(uint(mathutil.Min(totalStores, maxStoreConcurrency)), "wait apply") for _, store := range recovery.allStores { if err := ectx.Err(); err != nil { @@ -316,10 +316,9 @@ func (recovery *Recovery) WaitApply(ctx context.Context) (err error) { // ResolveData a worker pool to all tikv for execute delete all data whose has ts > resolvedTs func (recovery *Recovery) ResolveData(ctx context.Context, resolvedTs uint64) (err error) { - eg, ectx := errgroup.WithContext(ctx) totalStores := len(recovery.allStores) - workers := utils.NewWorkerPool(uint(mathutil.Min(totalStores, max_store_concurency)), "resolve data from tikv") + workers := utils.NewWorkerPool(uint(mathutil.Min(totalStores, maxStoreConcurrency)), "resolve data from tikv") // TODO: what if the resolved data take long time take long time?, it look we need some handling here, at least some retry may neccessary for _, store := range recovery.allStores { @@ -435,12 +434,12 @@ func (recovery *Recovery) makeRecoveryPlan() error { var ek = prefixEndKey(regions[p.regionId][0].EndKey) var fk, fv interface{} fk, _ = topo.Ceiling(sk) - // keysapce overlap sk within ceiling - fk + // keyspace overlap sk within ceiling - fk if fk != nil && (keyEq(fk.([]byte), sk) || keyCmp(fk.([]byte), ek) < 0) { continue } - // keysapce overlap sk within floor - fk.end_key + // keyspace overlap sk within floor - fk.end_key fk, fv = topo.Floor(sk) if fk != nil && keyCmp(fv.(RegionEndKey).endKey, sk) > 0 { continue From 45fc1437c8e7c9530cc238730502139b94ec1511 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Fri, 2 Sep 2022 14:37:51 +0800 Subject: [PATCH 3/4] address comment --- br/pkg/restore/data.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index 24cd87f38f333..f27d844907efa 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -216,11 +216,11 @@ func (recovery *Recovery) ReadRegionMeta(ctx context.Context) error { func (recovery *Recovery) getTotalRegions() int { // Group region peer info by region id. - var regions = make(map[uint64][]struct{}, 0) + var regions = make(map[uint64]struct{}, 0) for _, v := range recovery.storeMetas { for _, m := range v.regionMetas { - if regions[m.RegionId] == nil { - regions[m.RegionId] = make([]struct{}, 0, len(recovery.allStores)) + if _, ok := regions[m.RegionId]; !ok { + regions[m.RegionId] = struct{}{} } } } From 64319d737aea0cbd2c7823f1c034c7175e1638c5 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Fri, 2 Sep 2022 14:39:05 +0800 Subject: [PATCH 4/4] address comment --- br/pkg/task/restore_ebs_meta.go | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/br/pkg/task/restore_ebs_meta.go b/br/pkg/task/restore_ebs_meta.go index 53eee45767a0a..0c614cc19ac37 100644 --- a/br/pkg/task/restore_ebs_meta.go +++ b/br/pkg/task/restore_ebs_meta.go @@ -173,25 +173,8 @@ func (h *restoreEBSMetaHelper) preRestore(ctx context.Context) error { } h.metaInfo = metaInfo - // stop scheduler before recover data - log.Info("starting to remove some PD schedulers") - restoreFunc, e := h.pdc.RemoveAllPDSchedulers(ctx) - if e != nil { - return errors.Trace(err) - } - defer func() { - if ctx.Err() != nil { - log.Warn("context canceled, doing clean work with background context") - ctx = context.Background() - } - if restoreE := restoreFunc(ctx); restoreE != nil { - log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) - } - }() - // todo: check whether target cluster is compatible with the backup // but cluster hasn't bootstrapped, we cannot get cluster version from pd now. - return nil } @@ -228,6 +211,22 @@ func (h *restoreEBSMetaHelper) restore() error { return errors.Trace(err) } + // stop scheduler before recover data + log.Info("starting to remove some PD schedulers") + restoreFunc, e := h.pdc.RemoveAllPDSchedulers(ctx) + if e != nil { + return errors.Trace(err) + } + defer func() { + if ctx.Err() != nil { + log.Warn("context canceled, doing clean work with background context") + ctx = context.Background() + } + if restoreE := restoreFunc(ctx); restoreE != nil { + log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) + } + }() + storeCount := h.metaInfo.GetStoreCount() progress := h.g.StartProgress(ctx, h.cmdName, int64(storeCount), !h.cfg.LogProgress) defer progress.Close()