Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove usage of timer.Timer in benchlist #2446

Merged
merged 2 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 87 additions & 63 deletions snow/networking/benchlist/benchlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/heap"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/utils/timer"
"github.com/ava-labs/avalanchego/utils/timer/mockable"

safemath "github.com/ava-labs/avalanchego/utils/math"
Expand Down Expand Up @@ -54,9 +53,8 @@ type benchlist struct {
ctx *snow.ConsensusContext
metrics metrics

// Fires when the next validator should leave the bench
// Calls [update] when it fires
timer *timer.Timer
// Used to notify the timer that it should recalculate when it should fire
resetTimer chan struct{}

// Tells the time. Can be faked for testing.
clock mockable.Clock
Expand Down Expand Up @@ -105,8 +103,10 @@ func NewBenchlist(
if maxPortion < 0 || maxPortion >= 1 {
return nil, fmt.Errorf("max portion of benched stake must be in [0,1) but got %f", maxPortion)
}

benchlist := &benchlist{
ctx: ctx,
resetTimer: make(chan struct{}, 1),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the same hack here of a length 1 buffered channel as we do with block building. This allows us to be ensured that the timer will be reset after attempting to push a message without blocking on resetting the timer. This is important because the method attempting to reset the timer is holding a lock that the timer may also be attempting to grab.

failureStreaks: make(map[ids.NodeID]failureStreak),
benchlistSet: set.Set[ids.NodeID]{},
benchable: benchable,
Expand All @@ -117,38 +117,77 @@ func NewBenchlist(
duration: duration,
maxPortion: maxPortion,
}
benchlist.timer = timer.NewTimer(benchlist.update)
go benchlist.timer.Dispatch()
return benchlist, benchlist.metrics.Initialize(ctx.Registerer)
Comment on lines -120 to -122
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely odd that we initialized the metrics after kicking off the goroutine... Not a bug - but felt pretty close

if err := benchlist.metrics.Initialize(ctx.Registerer); err != nil {
return nil, err
}

go benchlist.run()
return benchlist, nil
}

// TODO: Close this goroutine during node shutdown
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never shutdown the timer previously... We still don't... But we should

func (b *benchlist) run() {
timer := time.NewTimer(0)
defer timer.Stop()

for {
// Invariant: The [timer] is not stopped.
select {
case <-timer.C:
case <-b.resetTimer:
if !timer.Stop() {
<-timer.C
}
}

b.waitForBenchedNodes()

b.removedExpiredNodes()

// Note: If there are no nodes to remove, [duration] will be 0 and we
// will immediately wait until there are benched nodes.
duration := b.durationToSleep()
timer.Reset(duration)
Copy link
Contributor Author

@StephenButtolph StephenButtolph Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: it is safe to reset a timer with a 0 or negative duration... I don't think that's actually possible... but it isn't a case we need to worry about

}
}

// Update removes benched validators whose time on the bench is over
func (b *benchlist) update() {
func (b *benchlist) waitForBenchedNodes() {
for {
b.lock.RLock()
_, _, ok := b.benchedHeap.Peek()
b.lock.RUnlock()
if ok {
return
}

// Invariant: Whenever a new node is benched we ensure that resetTimer
// has a pending message while the write lock is held.
<-b.resetTimer
}
}

func (b *benchlist) removedExpiredNodes() {
b.lock.Lock()
defer b.lock.Unlock()

now := b.clock.Time()
for {
if !b.canUnbench(now) {
_, next, ok := b.benchedHeap.Peek()
if !ok {
break
}
if now.Before(next) {
break
}
b.remove()
}
// Set next time update will be called
b.setNextLeaveTime()
}

// Removes the next node from the benchlist
// Assumes [b.lock] is held
func (b *benchlist) remove() {
nodeID, _, _ := b.benchedHeap.Pop()
b.ctx.Log.Debug("removing node from benchlist",
zap.Stringer("nodeID", nodeID),
)
b.benchlistSet.Remove(nodeID)
b.benchable.Unbenched(b.ctx.ChainID, nodeID)
nodeID, _, _ := b.benchedHeap.Pop()
b.ctx.Log.Debug("removing node from benchlist",
zap.Stringer("nodeID", nodeID),
)
b.benchlistSet.Remove(nodeID)
b.benchable.Unbenched(b.ctx.ChainID, nodeID)
}

// Update metrics
b.metrics.numBenched.Set(float64(b.benchedHeap.Len()))
benchedStake, err := b.vdrs.SubsetWeight(b.ctx.SubnetID, b.benchlistSet)
if err != nil {
Expand All @@ -161,56 +200,37 @@ func (b *benchlist) remove() {
b.metrics.weightBenched.Set(float64(benchedStake))
}

// Returns if a validator should leave the bench at time [now].
// False if no validator should.
// Assumes [b.lock] is held
func (b *benchlist) canUnbench(now time.Time) bool {
_, next, ok := b.benchedHeap.Peek()
if !ok {
return false
}
return now.After(next)
}
func (b *benchlist) durationToSleep() time.Duration {
b.lock.RLock()
defer b.lock.RUnlock()

// Set [b.timer] to fire when the next validator should leave the bench
// Assumes [b.lock] is held
func (b *benchlist) setNextLeaveTime() {
_, next, ok := b.benchedHeap.Peek()
if !ok {
b.timer.Cancel()
return
return 0
}

now := b.clock.Time()
nextLeave := next.Sub(now)
b.timer.SetTimeoutIn(nextLeave)
return next.Sub(now)
}

// IsBenched returns true if messages to [nodeID]
// should not be sent over the network and should immediately fail.
// IsBenched returns true if messages to [nodeID] should not be sent over the
// network and should immediately fail.
func (b *benchlist) IsBenched(nodeID ids.NodeID) bool {
b.lock.RLock()
defer b.lock.RUnlock()
return b.isBenched(nodeID)
}

// isBenched checks if [nodeID] is currently benched
// and calls cleanup if its benching period has elapsed
// Assumes [b.lock] is held.
func (b *benchlist) isBenched(nodeID ids.NodeID) bool {
if _, ok := b.benchlistSet[nodeID]; ok {
return true
}
return false
Comment on lines -200 to -203
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code made me sad

return b.benchlistSet.Contains(nodeID)
}

// RegisterResponse notes that we received a response from validator [validatorID]
// RegisterResponse notes that we received a response from [nodeID]
func (b *benchlist) RegisterResponse(nodeID ids.NodeID) {
b.streaklock.Lock()
defer b.streaklock.Unlock()

delete(b.failureStreaks, nodeID)
}

// RegisterResponse notes that a request to validator [validatorID] timed out
// RegisterResponse notes that a request to [nodeID] timed out
func (b *benchlist) RegisterFailure(nodeID ids.NodeID) {
b.lock.Lock()
defer b.lock.Unlock()
Expand Down Expand Up @@ -295,6 +315,12 @@ func (b *benchlist) bench(nodeID ids.NodeID) {
diff := maxBenchedUntil.Sub(minBenchedUntil)
benchedUntil := minBenchedUntil.Add(time.Duration(rand.Float64() * float64(diff))) // #nosec G404

b.ctx.Log.Debug("benching validator after consecutive failed queries",
zap.Stringer("nodeID", nodeID),
zap.Duration("benchDuration", benchedUntil.Sub(now)),
zap.Int("numFailedQueries", b.threshold),
)
Comment on lines +318 to +322
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It felt weird having this log after marking the node as benched.


// Add to benchlist times with randomized delay
b.benchlistSet.Add(nodeID)
b.benchable.Benched(b.ctx.ChainID, nodeID)
Expand All @@ -304,14 +330,12 @@ func (b *benchlist) bench(nodeID ids.NodeID) {
b.streaklock.Unlock()

b.benchedHeap.Push(nodeID, benchedUntil)
b.ctx.Log.Debug("benching validator after consecutive failed queries",
zap.Stringer("nodeID", nodeID),
zap.Duration("benchDuration", benchedUntil.Sub(now)),
zap.Int("numFailedQueries", b.threshold),
)

// Set [b.timer] to fire when next validator should leave bench
b.setNextLeaveTime()
// Update the timer to account for the newly benched node.
select {
case b.resetTimer <- struct{}{}:
default:
}

// Update metrics
b.metrics.numBenched.Set(float64(b.benchedHeap.Len()))
Expand Down
57 changes: 22 additions & 35 deletions snow/networking/benchlist/benchlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,14 @@ func TestBenchlistAdd(t *testing.T) {
)
require.NoError(err)
b := benchIntf.(*benchlist)
defer b.timer.Stop()
now := time.Now()
b.clock.Set(now)

// Nobody should be benched at the start
b.lock.Lock()
require.False(b.isBenched(vdrID0))
require.False(b.isBenched(vdrID1))
require.False(b.isBenched(vdrID2))
require.False(b.isBenched(vdrID3))
require.False(b.isBenched(vdrID4))
require.Empty(b.benchlistSet)
require.Empty(b.failureStreaks)
require.Zero(b.benchedHeap.Len())
require.Empty(b.benchlistSet)
b.lock.Unlock()

// Register [threshold - 1] failures in a row for vdr0
Expand All @@ -73,9 +67,8 @@ func TestBenchlistAdd(t *testing.T) {
}

// Still shouldn't be benched due to not enough consecutive failure
require.False(b.isBenched(vdrID0))
require.Zero(b.benchedHeap.Len())
require.Empty(b.benchlistSet)
require.Zero(b.benchedHeap.Len())
require.Len(b.failureStreaks, 1)
fs := b.failureStreaks[vdrID0]
require.Equal(threshold-1, fs.consecutive)
Expand All @@ -87,9 +80,8 @@ func TestBenchlistAdd(t *testing.T) {
// Still shouldn't be benched because not enough time (any in this case)
// has passed since the first failure
b.lock.Lock()
require.False(b.isBenched(vdrID0))
require.Zero(b.benchedHeap.Len())
require.Empty(b.benchlistSet)
require.Zero(b.benchedHeap.Len())
b.lock.Unlock()

// Move the time up
Expand All @@ -108,9 +100,9 @@ func TestBenchlistAdd(t *testing.T) {

// Now this validator should be benched
b.lock.Lock()
require.True(b.isBenched(vdrID0))
require.Equal(b.benchedHeap.Len(), 1)
require.Equal(b.benchlistSet.Len(), 1)
require.Contains(b.benchlistSet, vdrID0)
require.Equal(1, b.benchedHeap.Len())
require.Equal(1, b.benchlistSet.Len())

nodeID, benchedUntil, ok := b.benchedHeap.Peek()
require.True(ok)
Expand All @@ -133,10 +125,9 @@ func TestBenchlistAdd(t *testing.T) {
// vdr1 shouldn't be benched
// The response should have cleared its consecutive failures
b.lock.Lock()
require.True(b.isBenched(vdrID0))
require.False(b.isBenched(vdrID1))
require.Equal(b.benchedHeap.Len(), 1)
require.Equal(b.benchlistSet.Len(), 1)
require.Contains(b.benchlistSet, vdrID0)
require.Equal(1, b.benchedHeap.Len())
require.Equal(1, b.benchlistSet.Len())
require.Empty(b.failureStreaks)
b.lock.Unlock()

Expand Down Expand Up @@ -183,7 +174,6 @@ func TestBenchlistMaxStake(t *testing.T) {
)
require.NoError(err)
b := benchIntf.(*benchlist)
defer b.timer.Stop()
now := time.Now()
b.clock.Set(now)

Expand All @@ -209,11 +199,10 @@ func TestBenchlistMaxStake(t *testing.T) {
// Benching vdr2 (weight 1000) would cause the amount benched
// to exceed the maximum
b.lock.Lock()
require.True(b.isBenched(vdrID0))
require.True(b.isBenched(vdrID1))
require.False(b.isBenched(vdrID2))
require.Equal(b.benchedHeap.Len(), 2)
require.Equal(b.benchlistSet.Len(), 2)
require.Contains(b.benchlistSet, vdrID0)
require.Contains(b.benchlistSet, vdrID1)
require.Equal(2, b.benchedHeap.Len())
require.Equal(2, b.benchlistSet.Len())
require.Len(b.failureStreaks, 1)
fs := b.failureStreaks[vdrID2]
fs.consecutive = threshold
Expand All @@ -236,9 +225,9 @@ func TestBenchlistMaxStake(t *testing.T) {

// vdr4 should be benched now
b.lock.Lock()
require.True(b.isBenched(vdrID0))
require.True(b.isBenched(vdrID1))
require.True(b.isBenched(vdrID4))
require.Contains(b.benchlistSet, vdrID0)
require.Contains(b.benchlistSet, vdrID1)
require.Contains(b.benchlistSet, vdrID4)
require.Equal(3, b.benchedHeap.Len())
require.Equal(3, b.benchlistSet.Len())
require.Contains(b.benchlistSet, vdrID0)
Expand All @@ -254,10 +243,9 @@ func TestBenchlistMaxStake(t *testing.T) {
}

b.lock.Lock()
require.True(b.isBenched(vdrID0))
require.True(b.isBenched(vdrID1))
require.True(b.isBenched(vdrID4))
require.False(b.isBenched(vdrID2))
require.Contains(b.benchlistSet, vdrID0)
require.Contains(b.benchlistSet, vdrID1)
require.Contains(b.benchlistSet, vdrID4)
require.Equal(3, b.benchedHeap.Len())
require.Equal(3, b.benchlistSet.Len())
require.Len(b.failureStreaks, 1)
Expand Down Expand Up @@ -307,7 +295,6 @@ func TestBenchlistRemove(t *testing.T) {
)
require.NoError(err)
b := benchIntf.(*benchlist)
defer b.timer.Stop()
now := time.Now()
b.lock.Lock()
b.clock.Set(now)
Expand All @@ -332,9 +319,9 @@ func TestBenchlistRemove(t *testing.T) {

// All 3 should be benched
b.lock.Lock()
require.True(b.isBenched(vdrID0))
require.True(b.isBenched(vdrID1))
require.True(b.isBenched(vdrID2))
require.Contains(b.benchlistSet, vdrID0)
require.Contains(b.benchlistSet, vdrID1)
require.Contains(b.benchlistSet, vdrID2)
require.Equal(3, b.benchedHeap.Len())
require.Equal(3, b.benchlistSet.Len())
require.Empty(b.failureStreaks)
Expand Down
Loading