From c2dceb7dbf5777ef6b4c0479755af1def3b23bb9 Mon Sep 17 00:00:00 2001 From: Mantas Sidlauskas Date: Thu, 4 Apr 2024 16:04:42 +0300 Subject: [PATCH] notify only if refreshed --- common/membership/hashring.go | 21 ++++++----- common/membership/hashring_test.go | 59 +++++++++++++++++++++++------- 2 files changed, 57 insertions(+), 23 deletions(-) diff --git a/common/membership/hashring.go b/common/membership/hashring.go index 7a71bb686f0..7c1b6b54759 100644 --- a/common/membership/hashring.go +++ b/common/membership/hashring.go @@ -124,7 +124,7 @@ func (r *ring) Start() { r.logger.Fatal("subscribing to peer provider", tag.Error(err)) } - if err := r.refresh(); err != nil { + if _, err := r.refresh(); err != nil { r.logger.Fatal("failed to start service resolver", tag.Error(err)) } @@ -235,22 +235,22 @@ func (r *ring) Members() []HostInfo { return hosts } -func (r *ring) refresh() error { +func (r *ring) refresh() (refreshed bool, err error) { if r.members.refreshed.After(time.Now().Add(-minRefreshInternal)) { // refreshed too frequently - return nil + return false, nil } members, err := r.peerProvider.GetMembers(r.service) if err != nil { - return fmt.Errorf("getting members from peer provider: %w", err) + return false, fmt.Errorf("getting members from peer provider: %w", err) } r.members.Lock() defer r.members.Unlock() newMembersMap, changed := r.compareMembers(members) if !changed { - return nil + return false, nil } ring := emptyHashring() @@ -261,7 +261,7 @@ func (r *ring) refresh() error { r.value.Store(ring) r.logger.Info("refreshed ring members", tag.Value(members)) - return nil + return true, nil } func (r *ring) refreshRingWorker() { @@ -274,13 +274,16 @@ func (r *ring) refreshRingWorker() { case <-r.shutdownCh: return case event := <-r.refreshChan: // local signal or signal from provider - if err := r.refresh(); err != nil { + refreshed, err := r.refresh() + if err != nil { r.logger.Error("refreshing ring", tag.Error(err)) } - r.notifySubscribers(event) + if refreshed { + r.notifySubscribers(event) + } case <-refreshTicker.C: // periodically refresh membership r.emitHashIdentifier() - if err := r.refresh(); err != nil { + if _, err := r.refresh(); err != nil { r.logger.Error("periodically refreshing ring", tag.Error(err)) } } diff --git a/common/membership/hashring_test.go b/common/membership/hashring_test.go index 3f30d196c95..8b942f8c02d 100644 --- a/common/membership/hashring_test.go +++ b/common/membership/hashring_test.go @@ -48,9 +48,9 @@ func randSeq(n int) string { } func randomHostInfo(n int) []HostInfo { - res := make([]HostInfo, n) + res := make([]HostInfo, 0, n) for i := 0; i < n; i++ { - res = append(res, NewHostInfo(randSeq(5))) + res = append(res, NewDetailedHostInfo(randSeq(5), randSeq(12), PortMap{randSeq(3): 666})) } return res } @@ -116,14 +116,17 @@ func TestRefreshUpdatesRingOnlyWhenRingHasChanged(t *testing.T) { pp := NewMockPeerProvider(ctrl) pp.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1) - pp.EXPECT().GetMembers("test-service").Times(3) + pp.EXPECT().GetMembers("test-service").Times(1).Return(randomHostInfo(3), nil) hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0)) + // Start will also call .refresh() hr.Start() - - hr.refresh() updatedAt := hr.members.refreshed hr.refresh() + refreshed, err := hr.refresh() + + assert.NoError(t, err) + assert.False(t, refreshed) assert.Equal(t, updatedAt, hr.members.refreshed) } @@ -131,8 +134,13 @@ func TestRefreshUpdatesRingOnlyWhenRingHasChanged(t *testing.T) { func TestRefreshWillNotifySubscribers(t *testing.T) { ctrl := gomock.NewController(t) pp := NewMockPeerProvider(ctrl) + var hostsToReturn []HostInfo pp.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1) - pp.EXPECT().GetMembers("test-service").AnyTimes() + pp.EXPECT().GetMembers("test-service").Times(2).DoAndReturn(func(service string) ([]HostInfo, error) { + hostsToReturn = randomHostInfo(5) + time.Sleep(time.Millisecond * 70) + return hostsToReturn, nil + }) changed := &ChangedEvent{ HostsAdded: []string{"a"}, @@ -142,14 +150,35 @@ func TestRefreshWillNotifySubscribers(t *testing.T) { hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0)) hr.Start() - var changeCh = make(chan *ChangedEvent) - err := hr.Subscribe("notifyMe", changeCh) - assert.NoError(t, err) + var changeCh1 = make(chan *ChangedEvent) + var changeCh2 = make(chan *ChangedEvent) - go func() { hr.refreshChan <- changed }() - changedEvent := <-changeCh - assert.Equal(t, changed, changedEvent) + assert.NoError(t, hr.Subscribe("notifyMe1", changeCh1)) + assert.NoError(t, hr.Subscribe("notifyMe2", changeCh2)) + + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + changedEvent := <-changeCh1 + close(changeCh1) + assert.Equal(t, changed, changedEvent) + }() + + go func() { + defer wg.Done() + changedEvent2 := <-changeCh2 + close(changeCh2) + assert.Equal(t, changed, changedEvent2) + }() + // to bypass internal check + hr.members.refreshed = time.Now().AddDate(0, 0, -1) + hr.refreshChan <- changed + // Test if internal members are updated + _, ok := hr.members.keys[hostsToReturn[0].addr] + assert.True(t, ok, "members should contain just-added node") + wg.Wait() // wait until both subscribers will get notification } @@ -215,7 +244,8 @@ func TestErrorIsPropagatedWhenProviderFails(t *testing.T) { pp.EXPECT().GetMembers(gomock.Any()).Return(nil, errors.New("error")) hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0)) - assert.Error(t, hr.refresh()) + _, err := hr.refresh() + assert.Error(t, err) } func TestStopWillStopProvider(t *testing.T) { @@ -252,7 +282,8 @@ func TestLookupAndRefreshRaceCondition(t *testing.T) { for i := 0; i < 50; i++ { // to bypass internal check hr.members.refreshed = time.Now().AddDate(0, 0, -1) - assert.NoError(t, hr.refresh()) + _, err := hr.refresh() + assert.NoError(t, err) } wg.Done() }()