Skip to content

Commit

Permalink
fix:change file will not notify to client
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun committed Sep 19, 2023
1 parent efe3b90 commit c36267c
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 54 deletions.
9 changes: 7 additions & 2 deletions service/client_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@ import (
apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/wrapperspb"

testsuit "github.com/polarismesh/polaris/test/suit"
)

func TestClientCheck(t *testing.T) {
discoverSuit := &DiscoverTestSuit{}
if err := discoverSuit.Initialize(); err != nil {
if err := discoverSuit.Initialize(func(cfg *testsuit.TestConfig) {
cfg.HealthChecks.ClientCheckTtl = time.Second
cfg.HealthChecks.ClientCheckInterval = 5 * time.Second
}); err != nil {
t.Fatal(err)
}
defer discoverSuit.Destroy()
Expand All @@ -48,7 +53,7 @@ func TestClientCheck(t *testing.T) {
})
time.Sleep(20 * time.Second)
clientIds := map[string]bool{clientId1: true, clientId2: true}
for i := 0; i < 50; i++ {
for i := 0; i < 10; i++ {
for clientId := range clientIds {
fmt.Printf("%d report client for %s, round 1\n", i, clientId)
discoverSuit.DiscoverServer().ReportClient(context.Background(),
Expand Down
4 changes: 2 additions & 2 deletions service/healthcheck/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type LeaderChangeEventHandler struct {

// newLeaderChangeEventHandler
func newLeaderChangeEventHandler(svr *Server) *LeaderChangeEventHandler {

return &LeaderChangeEventHandler{
svr: svr,
cacheProvider: svr.cacheProvider,
Expand Down Expand Up @@ -84,7 +83,8 @@ func (handler *LeaderChangeEventHandler) startCheckSelfServiceInstances() {
for {
select {
case <-ticker.C:
handler.cacheProvider.selfServiceInstances.Range(func(instanceId string, value ItemWithChecker) {
cacheProvider := handler.cacheProvider
cacheProvider.selfServiceInstances.Range(func(instanceId string, value ItemWithChecker) {
handler.doCheckSelfServiceInstance(value.GetInstance())
})
case <-ctx.Done():
Expand Down
13 changes: 7 additions & 6 deletions service/healthcheck/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,25 @@ func WithPlugins() serverOption {
}
}

// WithCacheProvider .
func WithCacheProvider() serverOption {
// withCacheProvider .
func withCacheProvider() serverOption {
return func(svr *Server) error {
svr.cacheProvider = newCacheProvider(svr.hcOpt.Service, svr)
return nil
}
}

// WithCheckScheduler .
func WithCheckScheduler(cs *CheckScheduler) serverOption {
// withCheckScheduler .
func withCheckScheduler(cs *CheckScheduler) serverOption {
return func(svr *Server) error {
svr.checkScheduler = cs
cs.svr = svr
return nil
}
}

// WithDispatcher .
func WithDispatcher(ctx context.Context) serverOption {
// withDispatcher .
func withDispatcher(ctx context.Context) serverOption {
return func(svr *Server) error {
svr.dispatcher = newDispatcher(ctx, svr)
return nil
Expand Down
17 changes: 11 additions & 6 deletions service/healthcheck/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,19 @@ func NewHealthServer(ctx context.Context, hcOpt *Config, options ...serverOption
hcOpt = &Config{}
}
hcOpt.SetDefault()
options = append(options, withSubscriber(ctx), withChecker())
options = append(options,
withChecker(),
withCacheProvider(),
withCheckScheduler(newCheckScheduler(ctx, hcOpt.SlotNum, hcOpt.MinCheckInterval,
hcOpt.MaxCheckInterval, hcOpt.ClientCheckInterval, hcOpt.ClientCheckTtl)),
withDispatcher(ctx),
// 这个必须保证在最后一个 option
withSubscriber(ctx),
)

svr := &Server{
hcOpt: hcOpt,
hcOpt: hcOpt,
localHost: hcOpt.LocalHost,
}
for i := range options {
if err := options[i](svr); err != nil {
Expand Down Expand Up @@ -116,11 +125,7 @@ func initialize(ctx context.Context, hcOpt *Config, cacheOpen bool, bc *batch.Co
WithPlugins(),
WithStore(storage),
WithBatchController(bc),
WithCheckScheduler(newCheckScheduler(ctx, hcOpt.SlotNum, hcOpt.MinCheckInterval,
hcOpt.MaxCheckInterval, hcOpt.ClientCheckInterval, hcOpt.ClientCheckTtl)),
WithDispatcher(ctx),
WithTimeAdjuster(newTimeAdjuster(ctx, storage)),
WithCacheProvider(),
)
if err != nil {
return err
Expand Down
46 changes: 8 additions & 38 deletions service/healthcheck/test_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,56 +21,26 @@ import (
"context"
"fmt"

"github.com/polarismesh/polaris/plugin"
"github.com/polarismesh/polaris/service/batch"
"github.com/polarismesh/polaris/store"
)

func TestInitialize(ctx context.Context, hcOpt *Config, cacheOpen bool, bc *batch.Controller,
storage store.Store) (*Server, error) {

testServer := new(Server)
testServer.hcOpt = hcOpt

if !cacheOpen {
return nil, fmt.Errorf("[healthcheck]cache not open")
}
hcOpt.SetDefault()
if hcOpt.IsOpen() {
if len(hcOpt.Checkers) > 0 {
testServer.checkers = make(map[int32]plugin.HealthChecker, len(hcOpt.Checkers))
for _, entry := range hcOpt.Checkers {
checker := plugin.GetHealthChecker(entry.Name, &entry)
if checker == nil {
return nil, fmt.Errorf("[healthcheck]unknown healthchecker %s", entry.Name)
}
// The same health type check plugin can only exist in one
_, exist := testServer.checkers[int32(checker.Type())]
if exist {
return nil, fmt.Errorf("[healthcheck]duplicate healthchecker %s, checkType %d",
entry.Name, checker.Type())
}
testServer.checkers[int32(checker.Type())] = checker
if nil == testServer.defaultChecker {
testServer.defaultChecker = checker
}
}
} else {
return nil, fmt.Errorf("[healthcheck]no checker config")
}
testServer, err := NewHealthServer(ctx, hcOpt,
WithStore(storage),
WithBatchController(bc),
WithPlugins(),
WithTimeAdjuster(newTimeAdjuster(ctx, storage)),
)
if err != nil {
return nil, err
}
testServer.storage = storage
testServer.bc = bc

testServer.localHost = hcOpt.LocalHost
testServer.history = plugin.GetHistory()
testServer.discoverEvent = plugin.GetDiscoverEvent()

testServer.cacheProvider = newCacheProvider(hcOpt.Service, testServer)
testServer.timeAdjuster = newTimeAdjuster(ctx, testServer.storage)
testServer.checkScheduler = newCheckScheduler(ctx, hcOpt.SlotNum, hcOpt.MinCheckInterval,
hcOpt.MaxCheckInterval, hcOpt.ClientCheckInterval, hcOpt.ClientCheckTtl)
testServer.dispatcher = newDispatcher(ctx, testServer)
finishInit = true
return testServer, testServer.run(ctx)
}

0 comments on commit c36267c

Please sign in to comment.