From dfc94990ed1de82dd2455bcf95c0f47a5a2760d9 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 28 Nov 2023 14:29:50 +0800 Subject: [PATCH 1/4] add region syncer client status Signed-off-by: Cabinfever_B --- pkg/syncer/client.go | 18 +++++++++++++++++- pkg/syncer/server.go | 15 ++++++++++++++- server/server.go | 6 ++++++ .../server/region_syncer/region_syncer_test.go | 18 ++++++++++++++++-- 4 files changed, 53 insertions(+), 4 deletions(-) diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index f61ce320a74..d5f50fac9a5 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -19,6 +19,8 @@ import ( "time" "github.com/docker/go-units" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -77,6 +79,11 @@ func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (C var regionGuide = core.GenerateRegionGuideFunc(false) +// IsRunningAsClient returns whether the region syncer client is running. +func (s *RegionSyncer) IsRunningAsClient() bool { + return s.streamingRunning.Load() && s.historyLoaded.Load() +} + // StartSyncWithLeader starts to sync with leader. func (s *RegionSyncer) StartSyncWithLeader(addr string) { s.wg.Add(1) @@ -89,6 +96,8 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { go func() { defer logutil.LogPanic() defer s.wg.Done() + defer s.streamingRunning.Store(false) + defer s.historyLoaded.Store(false) // used to load region from kv storage to cache storage. bc := s.server.GetBasicCluster() regionStorage := s.server.GetStorage() @@ -132,6 +141,9 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { } stream, err := s.syncRegion(ctx, conn) + failpoint.Inject("disableClientStreaming", func() { + err = errors.Errorf("no stream") + }) if err != nil { if ev, ok := status.FromError(err); ok { if ev.Code() == codes.Canceled { @@ -142,11 +154,12 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { time.Sleep(time.Second) continue } - + s.streamingRunning.Store(true) log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex())) for { resp, err := stream.Recv() if err != nil { + s.streamingRunning.Store(false) log.Error("region sync with leader meet error", errs.ZapError(errs.ErrGRPCRecv, err)) if err = stream.CloseSend(); err != nil { log.Error("failed to terminate client stream", errs.ZapError(errs.ErrGRPCCloseSend, err)) @@ -155,6 +168,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { break } if s.history.GetNextIndex() != resp.GetStartIndex() { + log.Panic("test") log.Warn("server sync index not match the leader", zap.String("server", s.server.Name()), zap.Uint64("own", s.history.GetNextIndex()), @@ -212,6 +226,8 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { _ = regionStorage.DeleteRegion(old.GetMeta()) } } + // mark the client as running status when it finished the first history region sync. + s.historyLoaded.Store(true) } } }() diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index 7d339e75dbe..724e4f0e268 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -18,6 +18,7 @@ import ( "context" "io" "sync" + "sync/atomic" "time" "github.com/docker/go-units" @@ -83,6 +84,9 @@ type RegionSyncer struct { history *historyBuffer limit *ratelimit.RateLimiter tlsConfig *grpcutil.TLSConfig + // status when as client + streamingRunning atomic.Bool + historyLoaded atomic.Bool } // NewRegionSyncer returns a region syncer. @@ -228,7 +232,16 @@ func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.Sync if s.history.GetNextIndex() == startIndex { log.Info("requested server has already in sync with server", zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Uint64("last-index", startIndex)) - return nil + // still send a response to follower to show the history region sync. + resp := &pdpb.SyncRegionResponse{ + Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()}, + Regions: nil, + StartIndex: startIndex, + RegionStats: nil, + RegionLeaders: nil, + Buckets: nil, + } + return stream.Send(resp) } // do full synchronization if startIndex == 0 { diff --git a/server/server.go b/server/server.go index 76893c24388..43daa65d844 100644 --- a/server/server.go +++ b/server/server.go @@ -1363,6 +1363,12 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster { return s.cluster } +// DirectlyGetRaftCluster returns raft cluster directly. +// Only used for test. +func (s *Server) DirectlyGetRaftCluster() *cluster.RaftCluster { + return s.cluster +} + // GetCluster gets cluster. func (s *Server) GetCluster() *metapb.Cluster { return &metapb.Cluster{ diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index 6521432c0dc..7e4b31b6620 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -47,21 +47,35 @@ func (i *idAllocator) alloc() uint64 { func TestRegionSyncer(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) - defer cancel() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/storage/regionStorageFastFlush", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/syncer/noFastExitSync", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/syncer/disableClientStreaming", `return(true)`)) cluster, err := tests.NewTestCluster(ctx, 3, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true }) - defer cluster.Destroy() + defer func() { + cluster.Destroy() + cancel() + }() re.NoError(err) re.NoError(cluster.RunInitialServers()) cluster.WaitLeader() leaderServer := cluster.GetLeaderServer() + re.NoError(leaderServer.BootstrapCluster()) rc := leaderServer.GetServer().GetRaftCluster() re.NotNil(rc) + + testutil.Eventually(re, func() bool { + follower := cluster.GetServer(cluster.GetFollower()) + return !follower.GetServer().DirectlyGetRaftCluster().GetRegionSyncer().IsRunningAsClient() + }) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/syncer/disableClientStreaming")) re.True(cluster.WaitRegionSyncerClientsReady(2)) + testutil.Eventually(re, func() bool { + follower := cluster.GetServer(cluster.GetFollower()) + return follower.GetServer().DirectlyGetRaftCluster().GetRegionSyncer().IsRunningAsClient() + }) regionLen := 110 regions := initRegions(regionLen) From 786e6d7146061021f83cf95c1253a5dcd3d367da Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 28 Nov 2023 14:35:18 +0800 Subject: [PATCH 2/4] add test Signed-off-by: Cabinfever_B add test Signed-off-by: Cabinfever_B --- tests/server/region_syncer/region_syncer_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index 7e4b31b6620..8607b209f19 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -65,16 +65,15 @@ func TestRegionSyncer(t *testing.T) { re.NoError(leaderServer.BootstrapCluster()) rc := leaderServer.GetServer().GetRaftCluster() re.NotNil(rc) + followerServer := cluster.GetServer(cluster.GetFollower()) testutil.Eventually(re, func() bool { - follower := cluster.GetServer(cluster.GetFollower()) - return !follower.GetServer().DirectlyGetRaftCluster().GetRegionSyncer().IsRunningAsClient() + return !followerServer.GetServer().DirectlyGetRaftCluster().GetRegionSyncer().IsRunningAsClient() }) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/syncer/disableClientStreaming")) re.True(cluster.WaitRegionSyncerClientsReady(2)) testutil.Eventually(re, func() bool { - follower := cluster.GetServer(cluster.GetFollower()) - return follower.GetServer().DirectlyGetRaftCluster().GetRegionSyncer().IsRunningAsClient() + return followerServer.GetServer().DirectlyGetRaftCluster().GetRegionSyncer().IsRunningAsClient() }) regionLen := 110 @@ -133,7 +132,6 @@ func TestRegionSyncer(t *testing.T) { time.Sleep(4 * time.Second) // test All regions have been synchronized to the cache of followerServer - followerServer := cluster.GetServer(cluster.GetFollower()) re.NotNil(followerServer) cacheRegions := leaderServer.GetServer().GetBasicCluster().GetRegions() re.Len(cacheRegions, regionLen) @@ -155,6 +153,9 @@ func TestRegionSyncer(t *testing.T) { re.NoError(err) cluster.WaitLeader() leaderServer = cluster.GetLeaderServer() + testutil.Eventually(re, func() bool { + return !leaderServer.GetServer().GetRaftCluster().GetRegionSyncer().IsRunningAsClient() + }) re.NotNil(leaderServer) loadRegions := leaderServer.GetServer().GetRaftCluster().GetRegions() re.Len(loadRegions, regionLen) From 73064fcc7fbe7dce5023ccfbb8bfa935b37fa9ad Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 28 Nov 2023 17:48:32 +0800 Subject: [PATCH 3/4] address comment Signed-off-by: Cabinfever_B --- pkg/syncer/client.go | 5 ++--- tests/server/region_syncer/region_syncer_test.go | 6 +++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index d5f50fac9a5..1c6125d0330 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -79,8 +79,8 @@ func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (C var regionGuide = core.GenerateRegionGuideFunc(false) -// IsRunningAsClient returns whether the region syncer client is running. -func (s *RegionSyncer) IsRunningAsClient() bool { +// IsRunning returns whether the region syncer client is running. +func (s *RegionSyncer) IsRunning() bool { return s.streamingRunning.Load() && s.historyLoaded.Load() } @@ -168,7 +168,6 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { break } if s.history.GetNextIndex() != resp.GetStartIndex() { - log.Panic("test") log.Warn("server sync index not match the leader", zap.String("server", s.server.Name()), zap.Uint64("own", s.history.GetNextIndex()), diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index 8607b209f19..87b5c0683c7 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -68,12 +68,12 @@ func TestRegionSyncer(t *testing.T) { followerServer := cluster.GetServer(cluster.GetFollower()) testutil.Eventually(re, func() bool { - return !followerServer.GetServer().DirectlyGetRaftCluster().GetRegionSyncer().IsRunningAsClient() + return !followerServer.GetServer().DirectlyGetRaftCluster().GetRegionSyncer().IsRunning() }) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/syncer/disableClientStreaming")) re.True(cluster.WaitRegionSyncerClientsReady(2)) testutil.Eventually(re, func() bool { - return followerServer.GetServer().DirectlyGetRaftCluster().GetRegionSyncer().IsRunningAsClient() + return followerServer.GetServer().DirectlyGetRaftCluster().GetRegionSyncer().IsRunning() }) regionLen := 110 @@ -154,7 +154,7 @@ func TestRegionSyncer(t *testing.T) { cluster.WaitLeader() leaderServer = cluster.GetLeaderServer() testutil.Eventually(re, func() bool { - return !leaderServer.GetServer().GetRaftCluster().GetRegionSyncer().IsRunningAsClient() + return !leaderServer.GetServer().GetRaftCluster().GetRegionSyncer().IsRunning() }) re.NotNil(leaderServer) loadRegions := leaderServer.GetServer().GetRaftCluster().GetRegions() From 896abe1be8cbcc74c7389301f248539f11f1aecc Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 29 Nov 2023 15:11:49 +0800 Subject: [PATCH 4/4] address comment Signed-off-by: Cabinfever_B --- pkg/syncer/client.go | 6 ++---- pkg/syncer/server.go | 1 - 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index 1c6125d0330..00dd8c5107d 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -81,7 +81,7 @@ var regionGuide = core.GenerateRegionGuideFunc(false) // IsRunning returns whether the region syncer client is running. func (s *RegionSyncer) IsRunning() bool { - return s.streamingRunning.Load() && s.historyLoaded.Load() + return s.streamingRunning.Load() } // StartSyncWithLeader starts to sync with leader. @@ -97,7 +97,6 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { defer logutil.LogPanic() defer s.wg.Done() defer s.streamingRunning.Store(false) - defer s.historyLoaded.Store(false) // used to load region from kv storage to cache storage. bc := s.server.GetBasicCluster() regionStorage := s.server.GetStorage() @@ -154,7 +153,6 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { time.Sleep(time.Second) continue } - s.streamingRunning.Store(true) log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex())) for { resp, err := stream.Recv() @@ -226,7 +224,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { } } // mark the client as running status when it finished the first history region sync. - s.historyLoaded.Store(true) + s.streamingRunning.Store(true) } } }() diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index 724e4f0e268..4fb38614de0 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -86,7 +86,6 @@ type RegionSyncer struct { tlsConfig *grpcutil.TLSConfig // status when as client streamingRunning atomic.Bool - historyLoaded atomic.Bool } // NewRegionSyncer returns a region syncer.