Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snap_restore: resend recover_region while there are TiKV restarts #45361

Merged
merged 10 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Mgr struct {
}

func GetAllTiKVStoresWithRetry(ctx context.Context,
pdClient pd.Client,
pdClient util.StoreMeta,
storeBehavior util.StoreBehavior,
) ([]*metapb.Store, error) {
stores := make([]*metapb.Store, 0)
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/conn/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,20 @@ const (
TiFlashOnly StoreBehavior = 2
)

// StoreMeta is the required interface for a watcher.
// It is striped from pd.Client.
type StoreMeta interface {
// GetAllStores gets all stores from pd.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error)
}

// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
// stores must not be a tombstone and must never contain a label `engine=tiflash`.
func GetAllTiKVStores(
ctx context.Context,
pdClient pd.Client,
pdClient StoreMeta,
storeBehavior StoreBehavior,
) ([]*metapb.Store, error) {
// get all live stores.
Expand Down
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_library(
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/utils/iter",
"//br/pkg/utils/storewatch",
"//br/pkg/version",
"//config",
"//ddl",
Expand Down
117 changes: 85 additions & 32 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package restore
import (
"context"
"io"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
Expand All @@ -12,7 +13,9 @@ import (
"github.com/pingcap/tidb/br/pkg/common"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/utils/storewatch"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/util/mathutil"
tikvstore "github.com/tikv/client-go/v2/kv"
Expand Down Expand Up @@ -48,6 +51,9 @@ func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Stor
return totalRegions, errors.Trace(err)
}

// Once TiKV shuts down and reboot then, it may be left with no leader because of the recovery mode.
// This wathcher will retrigger `RecoveryRegions` for those stores.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This wathcher will retrigger `RecoveryRegions` for those stores.
// This watcher will re-trigger `RecoveryRegions` for those stores.

recovery.SpawnTiKVShutDownWatchers(ctx)
if err := recovery.RecoverRegions(ctx); err != nil {
return totalRegions, errors.Trace(err)
}
Expand Down Expand Up @@ -213,6 +219,39 @@ func (recovery *Recovery) GetTotalRegions() int {
return len(regions)
}

func (recovery *Recovery) RecoverRegionOfStore(ctx context.Context, storeID uint64, plan []*recovpb.RecoverRegionRequest) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (recovery *Recovery) RecoverRegionOfStore(ctx context.Context, storeID uint64, plan []*recovpb.RecoverRegionRequest) error {
func (recovery *Recovery) RecoverRegionsOfStore(ctx context.Context, storeID uint64, plan []*recovpb.RecoverRegionRequest) error {

storeAddr := getStoreAddress(recovery.allStores, storeID)
recoveryClient, conn, err := recovery.newRecoveryClient(ctx, storeAddr)
if err != nil {
log.Error("create tikv client failed", zap.Uint64("store id", storeID))
return errors.Trace(err)
}
defer conn.Close()
log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeID))
stream, err := recoveryClient.RecoverRegion(ctx)
if err != nil {
log.Error("create recover region failed", zap.Uint64("store id", storeID))
return errors.Trace(err)
}

// for a TiKV, send the stream
for _, s := range plan {
if err = stream.Send(s); err != nil {
log.Error("send recover region failed", zap.Error(err))
return errors.Trace(err)
}
}

reply, err := stream.CloseAndRecv()
if err != nil {
log.Error("close the stream failed")
return errors.Trace(err)
}
recovery.progress.Inc()
log.Info("recover region execution success", zap.Uint64("store id", reply.GetStoreId()))
return nil
}

// RecoverRegions send the recovery plan to recovery region (force leader etc)
// only tikvs have regions whose have to recover be sent
func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) {
Expand All @@ -224,46 +263,60 @@ func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) {
if err := ectx.Err(); err != nil {
break
}
storeId := storeId
plan := plan

storeAddr := getStoreAddress(recovery.allStores, storeId)
recoveryPlan := plan
recoveryStoreId := storeId
workers.ApplyOnErrorGroup(eg, func() error {
recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr)
if err != nil {
log.Error("create tikv client failed", zap.Uint64("store id", recoveryStoreId))
return errors.Trace(err)
}
defer conn.Close()
log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", recoveryStoreId))
stream, err := recoveryClient.RecoverRegion(ectx)
if err != nil {
log.Error("create recover region failed", zap.Uint64("store id", recoveryStoreId))
return errors.Trace(err)
}

// for a TiKV, send the stream
for _, s := range recoveryPlan {
if err = stream.Send(s); err != nil {
log.Error("send recover region failed", zap.Error(err))
return errors.Trace(err)
}
}

reply, err := stream.CloseAndRecv()
if err != nil {
log.Error("close the stream failed")
return errors.Trace(err)
}
recovery.progress.Inc()
log.Info("recover region execution success", zap.Uint64("store id", reply.GetStoreId()))
return nil
return recovery.RecoverRegionOfStore(ectx, storeId, plan)
})
}
// Wait for all TiKV instances force leader and wait apply to last log.
return eg.Wait()
}

func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) {
rebootStores := map[uint64]struct{}{}
cb := storewatch.MakeCallback(storewatch.WithOnReboot(func(s *metapb.Store) {
log.Info("Store reboot detected, will regenerate leaders.", zap.Uint64("id", s.GetId()))
rebootStores[s.Id] = struct{}{}
}), storewatch.WithOnDisconnect(func(s *metapb.Store) {
log.Warn("A store disconnected.", zap.Uint64("id", s.GetId()), zap.String("addr", s.GetAddress()))
}), storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) {
log.Info("Start to observing the state of store.", zap.Uint64("id", s.GetId()))
}))
watcher := storewatch.New(recovery.mgr.PDClient(), cb)
tick := time.NewTicker(30 * time.Second)
mainLoop := func() {
for {
select {
case <-ctx.Done():
return
case <-tick.C:
err := watcher.Step(ctx)
if err != nil {
log.Warn("Failed to step watcher.", logutil.ShortError(err))
}
for id := range rebootStores {
plan, ok := recovery.RecoveryPlan[id]
if !ok {
log.Warn("Store reboot detected, but no recovery plan found.", zap.Uint64("id", id))
continue
}
err := recovery.RecoverRegionOfStore(ctx, id, plan)
if err != nil {
log.Warn("Store reboot detected, but failed to regenerate leader.", zap.Uint64("id", id), logutil.ShortError(err))
continue
}
log.Info("Succeed to reload the leader in store.", zap.Uint64("id", id))
delete(rebootStores, id)
}
}
}
}

go mainLoop()
}

// WaitApply send wait apply to all tikv ensure all region peer apply log into the last
func (recovery *Recovery) WaitApply(ctx context.Context) (err error) {
eg, ectx := errgroup.WithContext(ctx)
Expand Down
29 changes: 29 additions & 0 deletions br/pkg/utils/storewatch/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "storewatch",
srcs = ["watching.go"],
importpath = "github.com/pingcap/tidb/br/pkg/utils/storewatch",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/conn",
"//br/pkg/conn/util",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/metapb",
],
)

go_test(
name = "storewatch_test",
timeout = "short",
srcs = ["watching_test.go"],
flaky = True,
shard_count = 3,
deps = [
":storewatch",
"//br/pkg/conn/util",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_pd_client//:client",
],
)
136 changes: 136 additions & 0 deletions br/pkg/utils/storewatch/watching.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// package storewatch provides a `Watcher` type which allows
// the user to listen the events of lifetime of stores.
package storewatch

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/conn/util"
)

// Callback will be called the supported event triggered.
type Callback interface {
OnNewStoreRegistered(store *metapb.Store)
OnDisconnect(store *metapb.Store)
OnReboot(store *metapb.Store)
}

// DynCallback is a function based callback set.
type DynCallback struct {
onNewStoreRegistered func(*metapb.Store)
onDisconnect func(*metapb.Store)
onReboot func(*metapb.Store)
}

// OnNewStoreRegistered will be called once new region added to be watched.
func (cb *DynCallback) OnNewStoreRegistered(store *metapb.Store) {
if cb.onNewStoreRegistered != nil {
cb.onNewStoreRegistered(store)
}
}

// OnDisconnect will be called once the store is disconnected.
func (cb *DynCallback) OnDisconnect(store *metapb.Store) {
if cb.onDisconnect != nil {
cb.onDisconnect(store)
}
}

// OnReboot will be called once the store is rebooted.
func (cb *DynCallback) OnReboot(store *metapb.Store) {
if cb.onReboot != nil {
cb.onReboot(store)
}
}

// DynCallbackOpt is the option for DynCallback.
type DynCallbackOpt func(*DynCallback)

// WithOnNewStoreRegistered adds a hook to the callback.
func WithOnNewStoreRegistered(f func(*metapb.Store)) DynCallbackOpt {
return func(cb *DynCallback) {
cb.onNewStoreRegistered = f
}
}

// WithOnDisconnect adds a hook to the callback.
func WithOnDisconnect(f func(*metapb.Store)) DynCallbackOpt {
return func(cb *DynCallback) {
cb.onDisconnect = f
}
}

// WithOnReboot adds a hook to the callback.
func WithOnReboot(f func(*metapb.Store)) DynCallbackOpt {
return func(cb *DynCallback) {
cb.onReboot = f
}
}

// MakeCallback creates a callback with the given options.
// Allowed options: WithOnNewStoreRegistered, WithOnDisconnect, WithOnReboot.
func MakeCallback(opts ...DynCallbackOpt) Callback {
cb := &DynCallback{}
for _, opt := range opts {
opt(cb)
}
return cb
}

// Watcher watches the lifetime of stores.
// generally it should be advanced by calling the `Step` call.
type Watcher struct {
cli util.StoreMeta
cb Callback

lastStores map[uint64]*metapb.Store
}

func New(cli util.StoreMeta, cb Callback) *Watcher {
return &Watcher{
cli: cli,
cb: cb,
lastStores: make(map[uint64]*metapb.Store),
}
}

func (w *Watcher) Step(ctx context.Context) error {
liveStores, err := conn.GetAllTiKVStoresWithRetry(ctx, w.cli, util.SkipTiFlash)
if err != nil {
return errors.Annotate(err, "failed to update store list")
}
recorded := map[uint64]struct{}{}
for _, store := range liveStores {
w.updateStore(store)
recorded[store.GetId()] = struct{}{}
}
w.retain(recorded)
return nil
}

// updateStore updates the current store. and call the hooks needed.
func (w *Watcher) updateStore(newStore *metapb.Store) {
lastStore, ok := w.lastStores[newStore.GetId()]
w.lastStores[newStore.GetId()] = newStore
if !ok {
w.cb.OnNewStoreRegistered(newStore)
return
}
if lastStore.GetState() == metapb.StoreState_Up && newStore.GetState() == metapb.StoreState_Offline {
w.cb.OnDisconnect(newStore)
}
if lastStore.StartTimestamp != newStore.StartTimestamp {
w.cb.OnReboot(newStore)
}
}

func (w *Watcher) retain(storeSet map[uint64]struct{}) {
for id := range w.lastStores {
if _, ok := storeSet[id]; !ok {
delete(w.lastStores, id)
}
}
}
Loading