From 7fb2891436b6b87a0373ad8d615e3b7bb36fe132 Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 12 Jul 2023 20:30:20 +0800 Subject: [PATCH 01/10] basic Signed-off-by: hillium --- br/pkg/restore/data.go | 108 ++++++++++++----- br/pkg/utils/storewatch/watching.go | 144 +++++++++++++++++++++++ br/pkg/utils/storewatch/watching_test.go | 137 +++++++++++++++++++++ 3 files changed, 357 insertions(+), 32 deletions(-) create mode 100644 br/pkg/utils/storewatch/watching.go create mode 100644 br/pkg/utils/storewatch/watching_test.go diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index eafc783a19a2f..c2f62582b9724 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -4,6 +4,7 @@ package restore import ( "context" "io" + "time" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" @@ -12,7 +13,9 @@ import ( "github.com/pingcap/tidb/br/pkg/common" "github.com/pingcap/tidb/br/pkg/conn" "github.com/pingcap/tidb/br/pkg/glue" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/utils/storewatch" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/util/mathutil" tikvstore "github.com/tikv/client-go/v2/kv" @@ -48,6 +51,9 @@ func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Stor return totalRegions, errors.Trace(err) } + // Once TiKV shuts down and reboot then, it may be left with no leader because of the recovery mode. + // This wathcher will retrigger `RecoveryRegions` for those stores. + recovery.SpawnTiKVShutDownWatchers(ctx) if err := recovery.RecoverRegions(ctx); err != nil { return totalRegions, errors.Trace(err) } @@ -213,6 +219,39 @@ func (recovery *Recovery) GetTotalRegions() int { return len(regions) } +func (recovery *Recovery) RecoverRegionOfStore(ctx context.Context, storeID uint64, plan []*recovpb.RecoverRegionRequest) error { + storeAddr := getStoreAddress(recovery.allStores, storeID) + recoveryClient, conn, err := recovery.newRecoveryClient(ctx, storeAddr) + if err != nil { + log.Error("create tikv client failed", zap.Uint64("store id", storeID)) + return errors.Trace(err) + } + defer conn.Close() + log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeID)) + stream, err := recoveryClient.RecoverRegion(ctx) + if err != nil { + log.Error("create recover region failed", zap.Uint64("store id", storeID)) + return errors.Trace(err) + } + + // for a TiKV, send the stream + for _, s := range plan { + if err = stream.Send(s); err != nil { + log.Error("send recover region failed", zap.Error(err)) + return errors.Trace(err) + } + } + + reply, err := stream.CloseAndRecv() + if err != nil { + log.Error("close the stream failed") + return errors.Trace(err) + } + recovery.progress.Inc() + log.Info("recover region execution success", zap.Uint64("store id", reply.GetStoreId())) + return nil +} + // RecoverRegions send the recovery plan to recovery region (force leader etc) // only tikvs have regions whose have to recover be sent func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) { @@ -225,45 +264,50 @@ func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) { break } - storeAddr := getStoreAddress(recovery.allStores, storeId) - recoveryPlan := plan - recoveryStoreId := storeId workers.ApplyOnErrorGroup(eg, func() error { - recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr) - if err != nil { - log.Error("create tikv client failed", zap.Uint64("store id", recoveryStoreId)) - return errors.Trace(err) - } - defer conn.Close() - log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", recoveryStoreId)) - stream, err := recoveryClient.RecoverRegion(ectx) - if err != nil { - log.Error("create recover region failed", zap.Uint64("store id", recoveryStoreId)) - return errors.Trace(err) - } - - // for a TiKV, send the stream - for _, s := range recoveryPlan { - if err = stream.Send(s); err != nil { - log.Error("send recover region failed", zap.Error(err)) - return errors.Trace(err) - } - } - - reply, err := stream.CloseAndRecv() - if err != nil { - log.Error("close the stream failed") - return errors.Trace(err) - } - recovery.progress.Inc() - log.Info("recover region execution success", zap.Uint64("store id", reply.GetStoreId())) - return nil + return recovery.RecoverRegionOfStore(ectx, storeId, plan) }) } // Wait for all TiKV instances force leader and wait apply to last log. return eg.Wait() } +func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) { + cb := storewatch.MakeCallback(storewatch.WithOnReboot(func(s *metapb.Store) { + log.Info("Store reboot detected, will regenerate leaders.", zap.Uint64("id", s.GetId())) + plan, ok := recovery.RecoveryPlan[s.GetId()] + if !ok { + log.Warn("Store reboot detected, but no recovery plan found.", zap.Uint64("id", s.GetId())) + return + } + err := recovery.RecoverRegionOfStore(ctx, s.GetId(), plan) + if err != nil { + log.Warn("Store reboot detected, but failed to regenerate leader.", zap.Uint64("id", s.GetId()), logutil.ShortError(err)) + } + }), storewatch.WithOnDisconnect(func(s *metapb.Store) { + log.Warn("A store disconnected.", zap.Uint64("id", s.GetId()), zap.String("addr", s.GetAddress())) + }), storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) { + log.Info("Start to observing the state of store.", zap.Uint64("id", s.GetId())) + })) + watcher := storewatch.New(recovery.mgr.PDClient(), cb) + tick := time.NewTicker(30 * time.Second) + mainLoop := func() { + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + err := watcher.Step(ctx) + if err != nil { + log.Warn("Failed to step watcher.", logutil.ShortError(err)) + } + } + } + } + + go mainLoop() +} + // WaitApply send wait apply to all tikv ensure all region peer apply log into the last func (recovery *Recovery) WaitApply(ctx context.Context) (err error) { eg, ectx := errgroup.WithContext(ctx) diff --git a/br/pkg/utils/storewatch/watching.go b/br/pkg/utils/storewatch/watching.go new file mode 100644 index 0000000000000..565eeb369c741 --- /dev/null +++ b/br/pkg/utils/storewatch/watching.go @@ -0,0 +1,144 @@ +// package storewatch provides a `Watcher` type which allows +// the user to listen the events of lifetime of stores. +package storewatch + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/metapb" + pd "github.com/tikv/pd/client" +) + +// StoreMeta is the required interface for a watcher. +// It is striped from pd.Client. +type StoreMeta interface { + // GetAllStores gets all stores from pd. + // The store may expire later. Caller is responsible for caching and taking care + // of store change. + GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) +} + +// Callback will be called the supported event triggered. +type Callback interface { + OnNewStoreRegistered(store *metapb.Store) + OnDisconnect(store *metapb.Store) + OnReboot(store *metapb.Store) +} + +// DynCallback is a function based callback set. +type DynCallback struct { + onNewStoreRegistered func(*metapb.Store) + onDisconnect func(*metapb.Store) + onReboot func(*metapb.Store) +} + +// OnNewStoreRegistered will be called once new region added to be watched. +func (cb *DynCallback) OnNewStoreRegistered(store *metapb.Store) { + if cb.onNewStoreRegistered != nil { + cb.onNewStoreRegistered(store) + } +} + +// OnDisconnect will be called once the store is disconnected. +func (cb *DynCallback) OnDisconnect(store *metapb.Store) { + if cb.onDisconnect != nil { + cb.onDisconnect(store) + } +} + +// OnReboot will be called once the store is rebooted. +func (cb *DynCallback) OnReboot(store *metapb.Store) { + if cb.onReboot != nil { + cb.onReboot(store) + } +} + +// DynCallbackOpt is the option for DynCallback. +type DynCallbackOpt func(*DynCallback) + +// WithOnNewStoreRegistered adds a hook to the callback. +func WithOnNewStoreRegistered(f func(*metapb.Store)) DynCallbackOpt { + return func(cb *DynCallback) { + cb.onNewStoreRegistered = f + } +} + +// WithOnDisconnect adds a hook to the callback. +func WithOnDisconnect(f func(*metapb.Store)) DynCallbackOpt { + return func(cb *DynCallback) { + cb.onDisconnect = f + } +} + +// WithOnReboot adds a hook to the callback. +func WithOnReboot(f func(*metapb.Store)) DynCallbackOpt { + return func(cb *DynCallback) { + cb.onReboot = f + } +} + +// MakeCallback creates a callback with the given options. +// Allowed options: WithOnNewStoreRegistered, WithOnDisconnect, WithOnReboot. +func MakeCallback(opts ...DynCallbackOpt) Callback { + cb := &DynCallback{} + for _, opt := range opts { + opt(cb) + } + return cb +} + +// Watcher watches the lifetime of stores. +// generally it should be advanced by calling the `Step` call. +type Watcher struct { + cli StoreMeta + cb Callback + + lastStores map[uint64]*metapb.Store +} + +func New(cli StoreMeta, cb Callback) *Watcher { + return &Watcher{ + cli: cli, + cb: cb, + lastStores: make(map[uint64]*metapb.Store), + } +} + +func (w *Watcher) Step(ctx context.Context) error { + liveStores, err := w.cli.GetAllStores(ctx, pd.WithExcludeTombstone()) + if err != nil { + return errors.Annotate(err, "failed to update store list") + } + recorded := map[uint64]struct{}{} + for _, store := range liveStores { + w.updateStore(store) + recorded[store.GetId()] = struct{}{} + } + w.retain(recorded) + return nil +} + +// updateStore updates the current store. and call the hooks needed. +func (w *Watcher) updateStore(newStore *metapb.Store) { + lastStore, ok := w.lastStores[newStore.GetId()] + w.lastStores[newStore.GetId()] = newStore + if !ok { + w.cb.OnNewStoreRegistered(newStore) + return + } + if lastStore.GetState() == metapb.StoreState_Up && newStore.GetState() == metapb.StoreState_Offline { + w.cb.OnDisconnect(newStore) + } + if lastStore.StartTimestamp != newStore.StartTimestamp && newStore.GetState() == metapb.StoreState_Up { + w.cb.OnReboot(newStore) + } +} + +func (w *Watcher) retain(storeSet map[uint64]struct{}) { + for id := range w.lastStores { + if _, ok := storeSet[id]; !ok { + delete(w.lastStores, id) + } + } +} diff --git a/br/pkg/utils/storewatch/watching_test.go b/br/pkg/utils/storewatch/watching_test.go new file mode 100644 index 0000000000000..7d779392f2614 --- /dev/null +++ b/br/pkg/utils/storewatch/watching_test.go @@ -0,0 +1,137 @@ +package storewatch_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/br/pkg/utils/storewatch" + "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client" +) + +type SequentialReturningStoreMeta struct { + sequence [][]*metapb.Store +} + +func NewSequentialReturningStoreMeta(sequence [][]*metapb.Store) storewatch.StoreMeta { + return &SequentialReturningStoreMeta{sequence: sequence} +} + +func (s *SequentialReturningStoreMeta) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) { + if len(s.sequence) == 0 { + return nil, fmt.Errorf("too many call to `GetAllStores` in test!") + } + stores := s.sequence[0] + s.sequence = s.sequence[1:] + return stores, nil +} + +func TestOnRegister(t *testing.T) { + // A sequence of store state that we should believe the store is offline. + seq := NewSequentialReturningStoreMeta([][]*metapb.Store{ + { + { + Id: 1, + State: metapb.StoreState_Up, + }, + }, + }) + callBackCalled := false + callback := storewatch.MakeCallback(storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) { callBackCalled = true })) + ctx := context.Background() + + watcher := storewatch.New(seq, callback) + require.NoError(t, watcher.Step(ctx)) + require.True(t, callBackCalled) +} + +func TestOnOffline(t *testing.T) { + // A sequence of store state that we should believe the store is offline. + seq := NewSequentialReturningStoreMeta([][]*metapb.Store{ + { + { + Id: 1, + State: metapb.StoreState_Up, + }, + }, + { + { + Id: 1, + State: metapb.StoreState_Offline, + }, + }, + }) + callBackCalled := false + callback := storewatch.MakeCallback(storewatch.WithOnDisconnect(func(s *metapb.Store) { callBackCalled = true })) + ctx := context.Background() + + watcher := storewatch.New(seq, callback) + require.NoError(t, watcher.Step(ctx)) + require.NoError(t, watcher.Step(ctx)) + require.True(t, callBackCalled) +} + +func TestOnReboot(t *testing.T) { + // A sequence of store state that we should believe the store is offline. + seq := NewSequentialReturningStoreMeta([][]*metapb.Store{ + { + { + Id: 1, + State: metapb.StoreState_Up, + StartTimestamp: 1, + }, + }, + { + { + Id: 1, + State: metapb.StoreState_Offline, + StartTimestamp: 1, + }, + }, + { + { + Id: 1, + State: metapb.StoreState_Up, + StartTimestamp: 2, + }, + }, + }) + callBackCalled := false + callback := storewatch.MakeCallback(storewatch.WithOnReboot(func(s *metapb.Store) { callBackCalled = true })) + ctx := context.Background() + + watcher := storewatch.New(seq, callback) + require.NoError(t, watcher.Step(ctx)) + require.NoError(t, watcher.Step(ctx)) + require.NoError(t, watcher.Step(ctx)) + require.True(t, callBackCalled) +} + +func TestOnRealStore(t *testing.T) { + t.SkipNow() + + req := require.New(t) + pdAddr := []string{"http://upd-1:2379"} + cli, err := pd.NewClient(pdAddr, pd.SecurityOption{}) + req.NoError(err) + cb := storewatch.MakeCallback( + storewatch.WithOnDisconnect(func(s *metapb.Store) { + fmt.Printf("Store %d at %s Disconnected\n", s.Id, s.Address) + }), + storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) { + fmt.Printf("Store %d at %s Registered (ts = %d)\n", s.Id, s.Address, s.GetStartTimestamp()) + }), + storewatch.WithOnReboot(func(s *metapb.Store) { + fmt.Printf("Store %d at %s Rebooted (ts = %d)\n", s.Id, s.Address, s.StartTimestamp) + }), + ) + + watcher := storewatch.New(cli, cb) + for { + req.NoError(watcher.Step(context.Background())) + time.Sleep(5 * time.Second) + } +} From 4bbb14a561c74385ecfed2c4ec5840ce902a09c0 Mon Sep 17 00:00:00 2001 From: hillium Date: Mon, 17 Jul 2023 10:06:18 +0800 Subject: [PATCH 02/10] fix the loop variable Signed-off-by: hillium --- br/pkg/restore/data.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index c2f62582b9724..d5be1b3585af3 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -263,6 +263,8 @@ func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) { if err := ectx.Err(); err != nil { break } + storeId := storeId + plan := plan workers.ApplyOnErrorGroup(eg, func() error { return recovery.RecoverRegionOfStore(ectx, storeId, plan) From 854d4e2d2a64dab9da4ffff0f48533f946ac424d Mon Sep 17 00:00:00 2001 From: hillium Date: Thu, 20 Jul 2023 16:34:15 +0800 Subject: [PATCH 03/10] added retry Signed-off-by: hillium --- br/pkg/restore/data.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index d5be1b3585af3..8f94c4b05cbd4 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -275,17 +275,11 @@ func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) { } func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) { + failedMark := map[uint64]struct{}{} cb := storewatch.MakeCallback(storewatch.WithOnReboot(func(s *metapb.Store) { log.Info("Store reboot detected, will regenerate leaders.", zap.Uint64("id", s.GetId())) - plan, ok := recovery.RecoveryPlan[s.GetId()] - if !ok { - log.Warn("Store reboot detected, but no recovery plan found.", zap.Uint64("id", s.GetId())) - return - } - err := recovery.RecoverRegionOfStore(ctx, s.GetId(), plan) - if err != nil { - log.Warn("Store reboot detected, but failed to regenerate leader.", zap.Uint64("id", s.GetId()), logutil.ShortError(err)) - } + failedMark[s.Id] = struct{}{} + }), storewatch.WithOnDisconnect(func(s *metapb.Store) { log.Warn("A store disconnected.", zap.Uint64("id", s.GetId()), zap.String("addr", s.GetAddress())) }), storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) { @@ -303,6 +297,20 @@ func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) { if err != nil { log.Warn("Failed to step watcher.", logutil.ShortError(err)) } + for id := range failedMark { + plan, ok := recovery.RecoveryPlan[id] + if !ok { + log.Warn("Store reboot detected, but no recovery plan found.", zap.Uint64("id", id)) + continue + } + err := recovery.RecoverRegionOfStore(ctx, id, plan) + if err != nil { + log.Warn("Store reboot detected, but failed to regenerate leader.", zap.Uint64("id", id), logutil.ShortError(err)) + continue + } + log.Info("Succeed to reload the leader in store.", zap.Uint64("id", id)) + delete(failedMark, id) + } } } } From 02efee9452b955edded6940104f83fa754bae8ba Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 26 Jul 2023 14:36:20 +0800 Subject: [PATCH 04/10] make bazel_prepare Signed-off-by: hillium --- br/pkg/restore/BUILD.bazel | 1 + br/pkg/utils/storewatch/BUILD.bazel | 27 +++++++++++++++++++++++++++ 2 files changed, 28 insertions(+) create mode 100644 br/pkg/utils/storewatch/BUILD.bazel diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index bc98e083cd9e6..b294223564a3e 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -45,6 +45,7 @@ go_library( "//br/pkg/summary", "//br/pkg/utils", "//br/pkg/utils/iter", + "//br/pkg/utils/storewatch", "//br/pkg/version", "//config", "//ddl", diff --git a/br/pkg/utils/storewatch/BUILD.bazel b/br/pkg/utils/storewatch/BUILD.bazel new file mode 100644 index 0000000000000..69f206d3cb82f --- /dev/null +++ b/br/pkg/utils/storewatch/BUILD.bazel @@ -0,0 +1,27 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "storewatch", + srcs = ["watching.go"], + importpath = "github.com/pingcap/tidb/br/pkg/utils/storewatch", + visibility = ["//visibility:public"], + deps = [ + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_tikv_pd_client//:client", + ], +) + +go_test( + name = "storewatch_test", + timeout = "short", + srcs = ["watching_test.go"], + flaky = True, + shard_count = 4, + deps = [ + ":storewatch", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_stretchr_testify//require", + "@com_github_tikv_pd_client//:client", + ], +) From 1ccae14e64dc10ce016ff7a3d735ab4b85fb3c4a Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 26 Jul 2023 14:44:53 +0800 Subject: [PATCH 05/10] fix build Signed-off-by: hillium --- br/pkg/restore/data.go | 1 - br/pkg/utils/storewatch/watching_test.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index 8f94c4b05cbd4..210dd825bae51 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -279,7 +279,6 @@ func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) { cb := storewatch.MakeCallback(storewatch.WithOnReboot(func(s *metapb.Store) { log.Info("Store reboot detected, will regenerate leaders.", zap.Uint64("id", s.GetId())) failedMark[s.Id] = struct{}{} - }), storewatch.WithOnDisconnect(func(s *metapb.Store) { log.Warn("A store disconnected.", zap.Uint64("id", s.GetId()), zap.String("addr", s.GetAddress())) }), storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) { diff --git a/br/pkg/utils/storewatch/watching_test.go b/br/pkg/utils/storewatch/watching_test.go index 7d779392f2614..bab05e2cf7f6d 100644 --- a/br/pkg/utils/storewatch/watching_test.go +++ b/br/pkg/utils/storewatch/watching_test.go @@ -22,7 +22,7 @@ func NewSequentialReturningStoreMeta(sequence [][]*metapb.Store) storewatch.Stor func (s *SequentialReturningStoreMeta) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) { if len(s.sequence) == 0 { - return nil, fmt.Errorf("too many call to `GetAllStores` in test!") + return nil, fmt.Errorf("too many call to `GetAllStores` in test") } stores := s.sequence[0] s.sequence = s.sequence[1:] From 978cff55d21e7699554677e9b7d8ca8d7c6e7806 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 28 Jul 2023 16:14:25 +0800 Subject: [PATCH 06/10] address comments Signed-off-by: hillium --- br/pkg/conn/conn.go | 2 +- br/pkg/conn/util/util.go | 11 ++++++++++- br/pkg/restore/data.go | 8 ++++---- br/pkg/utils/storewatch/watching.go | 20 ++++++-------------- 4 files changed, 21 insertions(+), 20 deletions(-) diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 0f24857e954cb..1ecb1d732ac13 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -71,7 +71,7 @@ type Mgr struct { } func GetAllTiKVStoresWithRetry(ctx context.Context, - pdClient pd.Client, + pdClient util.StoreMeta, storeBehavior util.StoreBehavior, ) ([]*metapb.Store, error) { stores := make([]*metapb.Store, 0) diff --git a/br/pkg/conn/util/util.go b/br/pkg/conn/util/util.go index 0479030c1c5ac..58f400a231d25 100644 --- a/br/pkg/conn/util/util.go +++ b/br/pkg/conn/util/util.go @@ -28,11 +28,20 @@ const ( TiFlashOnly StoreBehavior = 2 ) +// StoreMeta is the required interface for a watcher. +// It is striped from pd.Client. +type StoreMeta interface { + // GetAllStores gets all stores from pd. + // The store may expire later. Caller is responsible for caching and taking care + // of store change. + GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) +} + // GetAllTiKVStores returns all TiKV stores registered to the PD client. The // stores must not be a tombstone and must never contain a label `engine=tiflash`. func GetAllTiKVStores( ctx context.Context, - pdClient pd.Client, + pdClient StoreMeta, storeBehavior StoreBehavior, ) ([]*metapb.Store, error) { // get all live stores. diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index 210dd825bae51..061c3114980ba 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -275,10 +275,10 @@ func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) { } func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) { - failedMark := map[uint64]struct{}{} + rebootStores := map[uint64]struct{}{} cb := storewatch.MakeCallback(storewatch.WithOnReboot(func(s *metapb.Store) { log.Info("Store reboot detected, will regenerate leaders.", zap.Uint64("id", s.GetId())) - failedMark[s.Id] = struct{}{} + rebootStores[s.Id] = struct{}{} }), storewatch.WithOnDisconnect(func(s *metapb.Store) { log.Warn("A store disconnected.", zap.Uint64("id", s.GetId()), zap.String("addr", s.GetAddress())) }), storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) { @@ -296,7 +296,7 @@ func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) { if err != nil { log.Warn("Failed to step watcher.", logutil.ShortError(err)) } - for id := range failedMark { + for id := range rebootStores { plan, ok := recovery.RecoveryPlan[id] if !ok { log.Warn("Store reboot detected, but no recovery plan found.", zap.Uint64("id", id)) @@ -308,7 +308,7 @@ func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) { continue } log.Info("Succeed to reload the leader in store.", zap.Uint64("id", id)) - delete(failedMark, id) + delete(rebootStores, id) } } } diff --git a/br/pkg/utils/storewatch/watching.go b/br/pkg/utils/storewatch/watching.go index 565eeb369c741..67135803fecaf 100644 --- a/br/pkg/utils/storewatch/watching.go +++ b/br/pkg/utils/storewatch/watching.go @@ -7,18 +7,10 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" - pd "github.com/tikv/pd/client" + "github.com/pingcap/tidb/br/pkg/conn" + "github.com/pingcap/tidb/br/pkg/conn/util" ) -// StoreMeta is the required interface for a watcher. -// It is striped from pd.Client. -type StoreMeta interface { - // GetAllStores gets all stores from pd. - // The store may expire later. Caller is responsible for caching and taking care - // of store change. - GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) -} - // Callback will be called the supported event triggered. type Callback interface { OnNewStoreRegistered(store *metapb.Store) @@ -91,13 +83,13 @@ func MakeCallback(opts ...DynCallbackOpt) Callback { // Watcher watches the lifetime of stores. // generally it should be advanced by calling the `Step` call. type Watcher struct { - cli StoreMeta + cli util.StoreMeta cb Callback lastStores map[uint64]*metapb.Store } -func New(cli StoreMeta, cb Callback) *Watcher { +func New(cli util.StoreMeta, cb Callback) *Watcher { return &Watcher{ cli: cli, cb: cb, @@ -106,7 +98,7 @@ func New(cli StoreMeta, cb Callback) *Watcher { } func (w *Watcher) Step(ctx context.Context) error { - liveStores, err := w.cli.GetAllStores(ctx, pd.WithExcludeTombstone()) + liveStores, err := conn.GetAllTiKVStoresWithRetry(ctx, w.cli, util.SkipTiFlash) if err != nil { return errors.Annotate(err, "failed to update store list") } @@ -130,7 +122,7 @@ func (w *Watcher) updateStore(newStore *metapb.Store) { if lastStore.GetState() == metapb.StoreState_Up && newStore.GetState() == metapb.StoreState_Offline { w.cb.OnDisconnect(newStore) } - if lastStore.StartTimestamp != newStore.StartTimestamp && newStore.GetState() == metapb.StoreState_Up { + if lastStore.StartTimestamp != newStore.StartTimestamp { w.cb.OnReboot(newStore) } } From 59da4495cde41a0b864254d831448dc14b042f96 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 28 Jul 2023 16:56:10 +0800 Subject: [PATCH 07/10] fix build Signed-off-by: hillium --- br/pkg/utils/storewatch/BUILD.bazel | 4 +++- br/pkg/utils/storewatch/watching_test.go | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/br/pkg/utils/storewatch/BUILD.bazel b/br/pkg/utils/storewatch/BUILD.bazel index 69f206d3cb82f..7478eef33ac02 100644 --- a/br/pkg/utils/storewatch/BUILD.bazel +++ b/br/pkg/utils/storewatch/BUILD.bazel @@ -6,9 +6,10 @@ go_library( importpath = "github.com/pingcap/tidb/br/pkg/utils/storewatch", visibility = ["//visibility:public"], deps = [ + "//br/pkg/conn", + "//br/pkg/conn/util", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/metapb", - "@com_github_tikv_pd_client//:client", ], ) @@ -20,6 +21,7 @@ go_test( shard_count = 4, deps = [ ":storewatch", + "//br/pkg/conn/util", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_stretchr_testify//require", "@com_github_tikv_pd_client//:client", diff --git a/br/pkg/utils/storewatch/watching_test.go b/br/pkg/utils/storewatch/watching_test.go index bab05e2cf7f6d..bb234e15907f3 100644 --- a/br/pkg/utils/storewatch/watching_test.go +++ b/br/pkg/utils/storewatch/watching_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/br/pkg/conn/util" "github.com/pingcap/tidb/br/pkg/utils/storewatch" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" @@ -16,7 +17,7 @@ type SequentialReturningStoreMeta struct { sequence [][]*metapb.Store } -func NewSequentialReturningStoreMeta(sequence [][]*metapb.Store) storewatch.StoreMeta { +func NewSequentialReturningStoreMeta(sequence [][]*metapb.Store) util.StoreMeta { return &SequentialReturningStoreMeta{sequence: sequence} } From 7812c83dc079c32dfcd3b4365ef14b553eaa4105 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Fri, 28 Jul 2023 21:20:37 +0800 Subject: [PATCH 08/10] Apply suggestions from code review --- br/pkg/utils/storewatch/watching_test.go | 25 ------------------------ 1 file changed, 25 deletions(-) diff --git a/br/pkg/utils/storewatch/watching_test.go b/br/pkg/utils/storewatch/watching_test.go index bb234e15907f3..ab1b329cb1ad9 100644 --- a/br/pkg/utils/storewatch/watching_test.go +++ b/br/pkg/utils/storewatch/watching_test.go @@ -111,28 +111,3 @@ func TestOnReboot(t *testing.T) { require.True(t, callBackCalled) } -func TestOnRealStore(t *testing.T) { - t.SkipNow() - - req := require.New(t) - pdAddr := []string{"http://upd-1:2379"} - cli, err := pd.NewClient(pdAddr, pd.SecurityOption{}) - req.NoError(err) - cb := storewatch.MakeCallback( - storewatch.WithOnDisconnect(func(s *metapb.Store) { - fmt.Printf("Store %d at %s Disconnected\n", s.Id, s.Address) - }), - storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) { - fmt.Printf("Store %d at %s Registered (ts = %d)\n", s.Id, s.Address, s.GetStartTimestamp()) - }), - storewatch.WithOnReboot(func(s *metapb.Store) { - fmt.Printf("Store %d at %s Rebooted (ts = %d)\n", s.Id, s.Address, s.StartTimestamp) - }), - ) - - watcher := storewatch.New(cli, cb) - for { - req.NoError(watcher.Step(context.Background())) - time.Sleep(5 * time.Second) - } -} From 4ac0023cb932084676d64f0617616624e5e0730f Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 1 Aug 2023 14:19:11 +0800 Subject: [PATCH 09/10] make clippy happy Signed-off-by: hillium --- br/pkg/utils/storewatch/watching_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/br/pkg/utils/storewatch/watching_test.go b/br/pkg/utils/storewatch/watching_test.go index ab1b329cb1ad9..49ab4034bcfd7 100644 --- a/br/pkg/utils/storewatch/watching_test.go +++ b/br/pkg/utils/storewatch/watching_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "testing" - "time" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/conn/util" @@ -110,4 +109,3 @@ func TestOnReboot(t *testing.T) { require.NoError(t, watcher.Step(ctx)) require.True(t, callBackCalled) } - From 071c0cae494689b6b58112189fa151349a394105 Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 1 Aug 2023 14:55:13 +0800 Subject: [PATCH 10/10] make bazel_prepare Signed-off-by: hillium --- br/pkg/utils/storewatch/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/utils/storewatch/BUILD.bazel b/br/pkg/utils/storewatch/BUILD.bazel index 7478eef33ac02..640b7446d9517 100644 --- a/br/pkg/utils/storewatch/BUILD.bazel +++ b/br/pkg/utils/storewatch/BUILD.bazel @@ -18,7 +18,7 @@ go_test( timeout = "short", srcs = ["watching_test.go"], flaky = True, - shard_count = 4, + shard_count = 3, deps = [ ":storewatch", "//br/pkg/conn/util",