Skip to content

Commit

Permalink
notify only if refreshed
Browse files Browse the repository at this point in the history
  • Loading branch information
mantas-sidlauskas committed Apr 4, 2024
1 parent a8f23b7 commit c2dceb7
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 23 deletions.
21 changes: 12 additions & 9 deletions common/membership/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (r *ring) Start() {
r.logger.Fatal("subscribing to peer provider", tag.Error(err))
}

if err := r.refresh(); err != nil {
if _, err := r.refresh(); err != nil {
r.logger.Fatal("failed to start service resolver", tag.Error(err))
}

Expand Down Expand Up @@ -235,22 +235,22 @@ func (r *ring) Members() []HostInfo {
return hosts
}

func (r *ring) refresh() error {
func (r *ring) refresh() (refreshed bool, err error) {
if r.members.refreshed.After(time.Now().Add(-minRefreshInternal)) {
// refreshed too frequently
return nil
return false, nil
}

members, err := r.peerProvider.GetMembers(r.service)
if err != nil {
return fmt.Errorf("getting members from peer provider: %w", err)
return false, fmt.Errorf("getting members from peer provider: %w", err)
}

r.members.Lock()
defer r.members.Unlock()
newMembersMap, changed := r.compareMembers(members)
if !changed {
return nil
return false, nil
}

ring := emptyHashring()
Expand All @@ -261,7 +261,7 @@ func (r *ring) refresh() error {
r.value.Store(ring)
r.logger.Info("refreshed ring members", tag.Value(members))

return nil
return true, nil
}

func (r *ring) refreshRingWorker() {
Expand All @@ -274,13 +274,16 @@ func (r *ring) refreshRingWorker() {
case <-r.shutdownCh:
return
case event := <-r.refreshChan: // local signal or signal from provider
if err := r.refresh(); err != nil {
refreshed, err := r.refresh()
if err != nil {
r.logger.Error("refreshing ring", tag.Error(err))
}
r.notifySubscribers(event)
if refreshed {
r.notifySubscribers(event)
}
case <-refreshTicker.C: // periodically refresh membership
r.emitHashIdentifier()
if err := r.refresh(); err != nil {
if _, err := r.refresh(); err != nil {
r.logger.Error("periodically refreshing ring", tag.Error(err))
}
}
Expand Down
59 changes: 45 additions & 14 deletions common/membership/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func randSeq(n int) string {
}

func randomHostInfo(n int) []HostInfo {
res := make([]HostInfo, n)
res := make([]HostInfo, 0, n)
for i := 0; i < n; i++ {
res = append(res, NewHostInfo(randSeq(5)))
res = append(res, NewDetailedHostInfo(randSeq(5), randSeq(12), PortMap{randSeq(3): 666}))
}
return res
}
Expand Down Expand Up @@ -116,23 +116,31 @@ func TestRefreshUpdatesRingOnlyWhenRingHasChanged(t *testing.T) {
pp := NewMockPeerProvider(ctrl)

pp.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
pp.EXPECT().GetMembers("test-service").Times(3)
pp.EXPECT().GetMembers("test-service").Times(1).Return(randomHostInfo(3), nil)

hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
// Start will also call .refresh()
hr.Start()

hr.refresh()
updatedAt := hr.members.refreshed
hr.refresh()
refreshed, err := hr.refresh()

assert.NoError(t, err)
assert.False(t, refreshed)
assert.Equal(t, updatedAt, hr.members.refreshed)

}

func TestRefreshWillNotifySubscribers(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)
var hostsToReturn []HostInfo
pp.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
pp.EXPECT().GetMembers("test-service").AnyTimes()
pp.EXPECT().GetMembers("test-service").Times(2).DoAndReturn(func(service string) ([]HostInfo, error) {
hostsToReturn = randomHostInfo(5)
time.Sleep(time.Millisecond * 70)
return hostsToReturn, nil
})

changed := &ChangedEvent{
HostsAdded: []string{"a"},
Expand All @@ -142,14 +150,35 @@ func TestRefreshWillNotifySubscribers(t *testing.T) {

hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
hr.Start()
var changeCh = make(chan *ChangedEvent)

err := hr.Subscribe("notifyMe", changeCh)
assert.NoError(t, err)
var changeCh1 = make(chan *ChangedEvent)
var changeCh2 = make(chan *ChangedEvent)

go func() { hr.refreshChan <- changed }()
changedEvent := <-changeCh
assert.Equal(t, changed, changedEvent)
assert.NoError(t, hr.Subscribe("notifyMe1", changeCh1))
assert.NoError(t, hr.Subscribe("notifyMe2", changeCh2))

wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
changedEvent := <-changeCh1
close(changeCh1)
assert.Equal(t, changed, changedEvent)
}()

go func() {
defer wg.Done()
changedEvent2 := <-changeCh2
close(changeCh2)
assert.Equal(t, changed, changedEvent2)
}()
// to bypass internal check
hr.members.refreshed = time.Now().AddDate(0, 0, -1)
hr.refreshChan <- changed
// Test if internal members are updated
_, ok := hr.members.keys[hostsToReturn[0].addr]
assert.True(t, ok, "members should contain just-added node")
wg.Wait() // wait until both subscribers will get notification

}

Expand Down Expand Up @@ -215,7 +244,8 @@ func TestErrorIsPropagatedWhenProviderFails(t *testing.T) {
pp.EXPECT().GetMembers(gomock.Any()).Return(nil, errors.New("error"))

hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
assert.Error(t, hr.refresh())
_, err := hr.refresh()
assert.Error(t, err)
}

func TestStopWillStopProvider(t *testing.T) {
Expand Down Expand Up @@ -252,7 +282,8 @@ func TestLookupAndRefreshRaceCondition(t *testing.T) {
for i := 0; i < 50; i++ {
// to bypass internal check
hr.members.refreshed = time.Now().AddDate(0, 0, -1)
assert.NoError(t, hr.refresh())
_, err := hr.refresh()
assert.NoError(t, err)
}
wg.Done()
}()
Expand Down

0 comments on commit c2dceb7

Please sign in to comment.