Skip to content

Commit

Permalink
syncer: add region syncer client status (#7461)
Browse files Browse the repository at this point in the history
ref #7431

Signed-off-by: Cabinfever_B <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
CabinfeverB and ti-chi-bot[bot] authored Nov 30, 2023
1 parent 862eee1 commit 5aacd66
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 5 deletions.
15 changes: 14 additions & 1 deletion pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -77,6 +79,11 @@ func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (C

var regionGuide = core.GenerateRegionGuideFunc(false)

// IsRunning returns whether the region syncer client is running.
func (s *RegionSyncer) IsRunning() bool {
return s.streamingRunning.Load()
}

// StartSyncWithLeader starts to sync with leader.
func (s *RegionSyncer) StartSyncWithLeader(addr string) {
s.wg.Add(1)
Expand All @@ -89,6 +96,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
go func() {
defer logutil.LogPanic()
defer s.wg.Done()
defer s.streamingRunning.Store(false)
// used to load region from kv storage to cache storage.
bc := s.server.GetBasicCluster()
regionStorage := s.server.GetStorage()
Expand Down Expand Up @@ -132,6 +140,9 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
}

stream, err := s.syncRegion(ctx, conn)
failpoint.Inject("disableClientStreaming", func() {
err = errors.Errorf("no stream")
})
if err != nil {
if ev, ok := status.FromError(err); ok {
if ev.Code() == codes.Canceled {
Expand All @@ -142,11 +153,11 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
time.Sleep(time.Second)
continue
}

log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex()))
for {
resp, err := stream.Recv()
if err != nil {
s.streamingRunning.Store(false)
log.Error("region sync with leader meet error", errs.ZapError(errs.ErrGRPCRecv, err))
if err = stream.CloseSend(); err != nil {
log.Error("failed to terminate client stream", errs.ZapError(errs.ErrGRPCCloseSend, err))
Expand Down Expand Up @@ -212,6 +223,8 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
_ = regionStorage.DeleteRegion(old.GetMeta())
}
}
// mark the client as running status when it finished the first history region sync.
s.streamingRunning.Store(true)
}
}
}()
Expand Down
14 changes: 13 additions & 1 deletion pkg/syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"io"
"sync"
"sync/atomic"
"time"

"github.com/docker/go-units"
Expand Down Expand Up @@ -83,6 +84,8 @@ type RegionSyncer struct {
history *historyBuffer
limit *ratelimit.RateLimiter
tlsConfig *grpcutil.TLSConfig
// status when as client
streamingRunning atomic.Bool
}

// NewRegionSyncer returns a region syncer.
Expand Down Expand Up @@ -228,7 +231,16 @@ func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.Sync
if s.history.GetNextIndex() == startIndex {
log.Info("requested server has already in sync with server",
zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Uint64("last-index", startIndex))
return nil
// still send a response to follower to show the history region sync.
resp := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()},
Regions: nil,
StartIndex: startIndex,
RegionStats: nil,
RegionLeaders: nil,
Buckets: nil,
}
return stream.Send(resp)
}
// do full synchronization
if startIndex == 0 {
Expand Down
6 changes: 6 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,12 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster {
return s.cluster
}

// DirectlyGetRaftCluster returns raft cluster directly.
// Only used for test.
func (s *Server) DirectlyGetRaftCluster() *cluster.RaftCluster {
return s.cluster
}

// GetCluster gets cluster.
func (s *Server) GetCluster() *metapb.Cluster {
return &metapb.Cluster{
Expand Down
21 changes: 18 additions & 3 deletions tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,34 @@ func (i *idAllocator) alloc() uint64 {
func TestRegionSyncer(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/storage/regionStorageFastFlush", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/syncer/noFastExitSync", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/syncer/disableClientStreaming", `return(true)`))

cluster, err := tests.NewTestCluster(ctx, 3, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true })
defer cluster.Destroy()
defer func() {
cluster.Destroy()
cancel()
}()
re.NoError(err)

re.NoError(cluster.RunInitialServers())
cluster.WaitLeader()
leaderServer := cluster.GetLeaderServer()

re.NoError(leaderServer.BootstrapCluster())
rc := leaderServer.GetServer().GetRaftCluster()
re.NotNil(rc)
followerServer := cluster.GetServer(cluster.GetFollower())

testutil.Eventually(re, func() bool {
return !followerServer.GetServer().DirectlyGetRaftCluster().GetRegionSyncer().IsRunning()
})
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/syncer/disableClientStreaming"))
re.True(cluster.WaitRegionSyncerClientsReady(2))
testutil.Eventually(re, func() bool {
return followerServer.GetServer().DirectlyGetRaftCluster().GetRegionSyncer().IsRunning()
})

regionLen := 110
regions := initRegions(regionLen)
Expand Down Expand Up @@ -119,7 +132,6 @@ func TestRegionSyncer(t *testing.T) {
time.Sleep(4 * time.Second)

// test All regions have been synchronized to the cache of followerServer
followerServer := cluster.GetServer(cluster.GetFollower())
re.NotNil(followerServer)
cacheRegions := leaderServer.GetServer().GetBasicCluster().GetRegions()
re.Len(cacheRegions, regionLen)
Expand All @@ -141,6 +153,9 @@ func TestRegionSyncer(t *testing.T) {
re.NoError(err)
cluster.WaitLeader()
leaderServer = cluster.GetLeaderServer()
testutil.Eventually(re, func() bool {
return !leaderServer.GetServer().GetRaftCluster().GetRegionSyncer().IsRunning()
})
re.NotNil(leaderServer)
loadRegions := leaderServer.GetServer().GetRaftCluster().GetRegions()
re.Len(loadRegions, regionLen)
Expand Down

0 comments on commit 5aacd66

Please sign in to comment.