Skip to content

Commit

Permalink
snap_restore: resend recover_region while there are TiKV restarts (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen authored Aug 1, 2023
1 parent 2d23240 commit 3f9f825
Show file tree
Hide file tree
Showing 7 changed files with 373 additions and 34 deletions.
2 changes: 1 addition & 1 deletion br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Mgr struct {
}

func GetAllTiKVStoresWithRetry(ctx context.Context,
pdClient pd.Client,
pdClient util.StoreMeta,
storeBehavior util.StoreBehavior,
) ([]*metapb.Store, error) {
stores := make([]*metapb.Store, 0)
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/conn/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,20 @@ const (
TiFlashOnly StoreBehavior = 2
)

// StoreMeta is the required interface for a watcher.
// It is striped from pd.Client.
type StoreMeta interface {
// GetAllStores gets all stores from pd.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error)
}

// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
// stores must not be a tombstone and must never contain a label `engine=tiflash`.
func GetAllTiKVStores(
ctx context.Context,
pdClient pd.Client,
pdClient StoreMeta,
storeBehavior StoreBehavior,
) ([]*metapb.Store, error) {
// get all live stores.
Expand Down
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_library(
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/utils/iter",
"//br/pkg/utils/storewatch",
"//br/pkg/version",
"//config",
"//ddl",
Expand Down
117 changes: 85 additions & 32 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package restore
import (
"context"
"io"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
Expand All @@ -12,7 +13,9 @@ import (
"github.com/pingcap/tidb/br/pkg/common"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/utils/storewatch"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/util/mathutil"
tikvstore "github.com/tikv/client-go/v2/kv"
Expand Down Expand Up @@ -48,6 +51,9 @@ func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Stor
return totalRegions, errors.Trace(err)
}

// Once TiKV shuts down and reboot then, it may be left with no leader because of the recovery mode.
// This wathcher will retrigger `RecoveryRegions` for those stores.
recovery.SpawnTiKVShutDownWatchers(ctx)
if err := recovery.RecoverRegions(ctx); err != nil {
return totalRegions, errors.Trace(err)
}
Expand Down Expand Up @@ -213,6 +219,39 @@ func (recovery *Recovery) GetTotalRegions() int {
return len(regions)
}

func (recovery *Recovery) RecoverRegionOfStore(ctx context.Context, storeID uint64, plan []*recovpb.RecoverRegionRequest) error {
storeAddr := getStoreAddress(recovery.allStores, storeID)
recoveryClient, conn, err := recovery.newRecoveryClient(ctx, storeAddr)
if err != nil {
log.Error("create tikv client failed", zap.Uint64("store id", storeID))
return errors.Trace(err)
}
defer conn.Close()
log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeID))
stream, err := recoveryClient.RecoverRegion(ctx)
if err != nil {
log.Error("create recover region failed", zap.Uint64("store id", storeID))
return errors.Trace(err)
}

// for a TiKV, send the stream
for _, s := range plan {
if err = stream.Send(s); err != nil {
log.Error("send recover region failed", zap.Error(err))
return errors.Trace(err)
}
}

reply, err := stream.CloseAndRecv()
if err != nil {
log.Error("close the stream failed")
return errors.Trace(err)
}
recovery.progress.Inc()
log.Info("recover region execution success", zap.Uint64("store id", reply.GetStoreId()))
return nil
}

// RecoverRegions send the recovery plan to recovery region (force leader etc)
// only tikvs have regions whose have to recover be sent
func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) {
Expand All @@ -224,46 +263,60 @@ func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) {
if err := ectx.Err(); err != nil {
break
}
storeId := storeId
plan := plan

storeAddr := getStoreAddress(recovery.allStores, storeId)
recoveryPlan := plan
recoveryStoreId := storeId
workers.ApplyOnErrorGroup(eg, func() error {
recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr)
if err != nil {
log.Error("create tikv client failed", zap.Uint64("store id", recoveryStoreId))
return errors.Trace(err)
}
defer conn.Close()
log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", recoveryStoreId))
stream, err := recoveryClient.RecoverRegion(ectx)
if err != nil {
log.Error("create recover region failed", zap.Uint64("store id", recoveryStoreId))
return errors.Trace(err)
}

// for a TiKV, send the stream
for _, s := range recoveryPlan {
if err = stream.Send(s); err != nil {
log.Error("send recover region failed", zap.Error(err))
return errors.Trace(err)
}
}

reply, err := stream.CloseAndRecv()
if err != nil {
log.Error("close the stream failed")
return errors.Trace(err)
}
recovery.progress.Inc()
log.Info("recover region execution success", zap.Uint64("store id", reply.GetStoreId()))
return nil
return recovery.RecoverRegionOfStore(ectx, storeId, plan)
})
}
// Wait for all TiKV instances force leader and wait apply to last log.
return eg.Wait()
}

func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) {
rebootStores := map[uint64]struct{}{}
cb := storewatch.MakeCallback(storewatch.WithOnReboot(func(s *metapb.Store) {
log.Info("Store reboot detected, will regenerate leaders.", zap.Uint64("id", s.GetId()))
rebootStores[s.Id] = struct{}{}
}), storewatch.WithOnDisconnect(func(s *metapb.Store) {
log.Warn("A store disconnected.", zap.Uint64("id", s.GetId()), zap.String("addr", s.GetAddress()))
}), storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) {
log.Info("Start to observing the state of store.", zap.Uint64("id", s.GetId()))
}))
watcher := storewatch.New(recovery.mgr.PDClient(), cb)
tick := time.NewTicker(30 * time.Second)
mainLoop := func() {
for {
select {
case <-ctx.Done():
return
case <-tick.C:
err := watcher.Step(ctx)
if err != nil {
log.Warn("Failed to step watcher.", logutil.ShortError(err))
}
for id := range rebootStores {
plan, ok := recovery.RecoveryPlan[id]
if !ok {
log.Warn("Store reboot detected, but no recovery plan found.", zap.Uint64("id", id))
continue
}
err := recovery.RecoverRegionOfStore(ctx, id, plan)
if err != nil {
log.Warn("Store reboot detected, but failed to regenerate leader.", zap.Uint64("id", id), logutil.ShortError(err))
continue
}
log.Info("Succeed to reload the leader in store.", zap.Uint64("id", id))
delete(rebootStores, id)
}
}
}
}

go mainLoop()
}

// WaitApply send wait apply to all tikv ensure all region peer apply log into the last
func (recovery *Recovery) WaitApply(ctx context.Context) (err error) {
eg, ectx := errgroup.WithContext(ctx)
Expand Down
29 changes: 29 additions & 0 deletions br/pkg/utils/storewatch/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "storewatch",
srcs = ["watching.go"],
importpath = "github.com/pingcap/tidb/br/pkg/utils/storewatch",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/conn",
"//br/pkg/conn/util",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/metapb",
],
)

go_test(
name = "storewatch_test",
timeout = "short",
srcs = ["watching_test.go"],
flaky = True,
shard_count = 3,
deps = [
":storewatch",
"//br/pkg/conn/util",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_pd_client//:client",
],
)
136 changes: 136 additions & 0 deletions br/pkg/utils/storewatch/watching.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// package storewatch provides a `Watcher` type which allows
// the user to listen the events of lifetime of stores.
package storewatch

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/conn/util"
)

// Callback will be called the supported event triggered.
type Callback interface {
OnNewStoreRegistered(store *metapb.Store)
OnDisconnect(store *metapb.Store)
OnReboot(store *metapb.Store)
}

// DynCallback is a function based callback set.
type DynCallback struct {
onNewStoreRegistered func(*metapb.Store)
onDisconnect func(*metapb.Store)
onReboot func(*metapb.Store)
}

// OnNewStoreRegistered will be called once new region added to be watched.
func (cb *DynCallback) OnNewStoreRegistered(store *metapb.Store) {
if cb.onNewStoreRegistered != nil {
cb.onNewStoreRegistered(store)
}
}

// OnDisconnect will be called once the store is disconnected.
func (cb *DynCallback) OnDisconnect(store *metapb.Store) {
if cb.onDisconnect != nil {
cb.onDisconnect(store)
}
}

// OnReboot will be called once the store is rebooted.
func (cb *DynCallback) OnReboot(store *metapb.Store) {
if cb.onReboot != nil {
cb.onReboot(store)
}
}

// DynCallbackOpt is the option for DynCallback.
type DynCallbackOpt func(*DynCallback)

// WithOnNewStoreRegistered adds a hook to the callback.
func WithOnNewStoreRegistered(f func(*metapb.Store)) DynCallbackOpt {
return func(cb *DynCallback) {
cb.onNewStoreRegistered = f
}
}

// WithOnDisconnect adds a hook to the callback.
func WithOnDisconnect(f func(*metapb.Store)) DynCallbackOpt {
return func(cb *DynCallback) {
cb.onDisconnect = f
}
}

// WithOnReboot adds a hook to the callback.
func WithOnReboot(f func(*metapb.Store)) DynCallbackOpt {
return func(cb *DynCallback) {
cb.onReboot = f
}
}

// MakeCallback creates a callback with the given options.
// Allowed options: WithOnNewStoreRegistered, WithOnDisconnect, WithOnReboot.
func MakeCallback(opts ...DynCallbackOpt) Callback {
cb := &DynCallback{}
for _, opt := range opts {
opt(cb)
}
return cb
}

// Watcher watches the lifetime of stores.
// generally it should be advanced by calling the `Step` call.
type Watcher struct {
cli util.StoreMeta
cb Callback

lastStores map[uint64]*metapb.Store
}

func New(cli util.StoreMeta, cb Callback) *Watcher {
return &Watcher{
cli: cli,
cb: cb,
lastStores: make(map[uint64]*metapb.Store),
}
}

func (w *Watcher) Step(ctx context.Context) error {
liveStores, err := conn.GetAllTiKVStoresWithRetry(ctx, w.cli, util.SkipTiFlash)
if err != nil {
return errors.Annotate(err, "failed to update store list")
}
recorded := map[uint64]struct{}{}
for _, store := range liveStores {
w.updateStore(store)
recorded[store.GetId()] = struct{}{}
}
w.retain(recorded)
return nil
}

// updateStore updates the current store. and call the hooks needed.
func (w *Watcher) updateStore(newStore *metapb.Store) {
lastStore, ok := w.lastStores[newStore.GetId()]
w.lastStores[newStore.GetId()] = newStore
if !ok {
w.cb.OnNewStoreRegistered(newStore)
return
}
if lastStore.GetState() == metapb.StoreState_Up && newStore.GetState() == metapb.StoreState_Offline {
w.cb.OnDisconnect(newStore)
}
if lastStore.StartTimestamp != newStore.StartTimestamp {
w.cb.OnReboot(newStore)
}
}

func (w *Watcher) retain(storeSet map[uint64]struct{}) {
for id := range w.lastStores {
if _, ok := storeSet[id]; !ok {
delete(w.lastStores, id)
}
}
}
Loading

0 comments on commit 3f9f825

Please sign in to comment.