Skip to content

Commit

Permalink
add heap map
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-kim committed Oct 5, 2023
1 parent c520eb6 commit a53f73a
Show file tree
Hide file tree
Showing 13 changed files with 441 additions and 443 deletions.
68 changes: 17 additions & 51 deletions snow/engine/avalanche/vertex/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,16 @@
package vertex

import (
"container/heap"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/consensus/avalanche"
"github.com/ava-labs/avalanchego/utils/set"
)

var (
_ Heap = (*maxHeightVertexHeap)(nil)
_ heap.Interface = (*priorityQueue)(nil)
"github.com/ava-labs/avalanchego/utils/heap"
)

type priorityQueue []avalanche.Vertex
var _ Heap = (*maxHeightVertexHeap)(nil)

func (pq priorityQueue) Len() int {
return len(pq)
}

// Returns true if the vertex at index i has greater height than the vertex at
// index j.
func (pq priorityQueue) Less(i, j int) bool {
statusI := pq[i].Status()
statusJ := pq[j].Status()
func Less(i, j avalanche.Vertex) bool {
statusI := i.Status()
statusJ := j.Status()

// Put unknown vertices at the front of the heap to ensure once we have made
// it below a certain height in DAG traversal we do not need to reset
Expand All @@ -38,37 +25,17 @@ func (pq priorityQueue) Less(i, j int) bool {
}

// Treat errors on retrieving the height as if the vertex is not fetched
heightI, errI := pq[i].Height()
heightI, errI := i.Height()
if errI != nil {
return true
}
heightJ, errJ := pq[j].Height()
heightJ, errJ := j.Height()
if errJ != nil {
return false
}
return heightI > heightJ
}

func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}

// Push adds an item to this priority queue. x must have type *vertexItem
func (pq *priorityQueue) Push(x interface{}) {
item := x.(avalanche.Vertex)
*pq = append(*pq, item)
}

// Pop returns the last item in this priorityQueue
func (pq *priorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil
*pq = old[0 : n-1]
return item
}

// Heap defines the functionality of a heap of vertices with unique VertexIDs
// ordered by height
type Heap interface {
Expand All @@ -91,38 +58,36 @@ type Heap interface {

// NewHeap returns an empty Heap
func NewHeap() Heap {
return &maxHeightVertexHeap{}
return &maxHeightVertexHeap{
heap: heap.NewMap[ids.ID, avalanche.Vertex](Less),
}
}

type maxHeightVertexHeap struct {
heap priorityQueue
elementIDs set.Set[ids.ID]
heap heap.Map[ids.ID, avalanche.Vertex]
}

func (vh *maxHeightVertexHeap) Clear() {
vh.heap = priorityQueue{}
vh.elementIDs.Clear()
vh.heap = heap.NewMap[ids.ID, avalanche.Vertex](Less)
}

// Push adds an element to this heap. Returns true if the element was added.
// Returns false if it was already in the heap.
func (vh *maxHeightVertexHeap) Push(vtx avalanche.Vertex) bool {
vtxID := vtx.ID()
if vh.elementIDs.Contains(vtxID) {
if _, ok := vh.heap.Index()[vtxID]; ok {
return false
}

vh.elementIDs.Add(vtxID)
heap.Push(&vh.heap, vtx)
vh.heap.Push(vtxID, vtx)
return true
}

// If there are any vertices in this heap with status Unknown, removes one such
// vertex and returns it. Otherwise, removes and returns the vertex in this heap
// with the greatest height.
func (vh *maxHeightVertexHeap) Pop() avalanche.Vertex {
vtx := heap.Pop(&vh.heap).(avalanche.Vertex)
vh.elementIDs.Remove(vtx.ID())
_, vtx, _ := vh.heap.Pop()
return vtx
}

Expand All @@ -131,5 +96,6 @@ func (vh *maxHeightVertexHeap) Len() int {
}

func (vh *maxHeightVertexHeap) Contains(vtxID ids.ID) bool {
return vh.elementIDs.Contains(vtxID)
_, ok := vh.heap.Index()[vtxID]
return ok
}
116 changes: 33 additions & 83 deletions snow/networking/benchlist/benchlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package benchlist

import (
"container/heap"
"fmt"
"math/rand"
"sync"
Expand All @@ -16,6 +15,7 @@ import (

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/heap"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/utils/timer"
Expand All @@ -24,8 +24,6 @@ import (
safemath "github.com/ava-labs/avalanchego/utils/math"
)

var _ heap.Interface = (*benchedQueue)(nil)

// If a peer consistently does not respond to queries, it will
// increase latencies on the network whenever that peer is polled.
// If we cannot terminate the poll early, then the poll will wait
Expand All @@ -45,46 +43,6 @@ type Benchlist interface {
IsBenched(nodeID ids.NodeID) bool
}

// Data about a validator who is benched
type benchData struct {
benchedUntil time.Time
nodeID ids.NodeID
index int
}

// Each element is a benched validator
type benchedQueue []*benchData

func (bq benchedQueue) Len() int {
return len(bq)
}

func (bq benchedQueue) Less(i, j int) bool {
return bq[i].benchedUntil.Before(bq[j].benchedUntil)
}

func (bq benchedQueue) Swap(i, j int) {
bq[i], bq[j] = bq[j], bq[i]
bq[i].index = i
bq[j].index = j
}

// Push adds an item to this queue. x must have type *benchData
func (bq *benchedQueue) Push(x interface{}) {
item := x.(*benchData)
item.index = len(*bq)
*bq = append(*bq, item)
}

// Pop returns the validator that should leave the bench next
func (bq *benchedQueue) Pop() interface{} {
n := len(*bq)
item := (*bq)[n-1]
(*bq)[n-1] = nil // make sure the item is freed from memory
*bq = (*bq)[:n-1]
return item
}

type failureStreak struct {
// Time of first consecutive timeout
firstFailure time.Time
Expand Down Expand Up @@ -120,9 +78,8 @@ type benchlist struct {
// IDs of validators that are currently benched
benchlistSet set.Set[ids.NodeID]

// Min heap containing benched validators and their endtimes
// Pop() returns the next validator to leave
benchedQueue benchedQueue
// Min heap of benched validators ordered by when they can be unbenched
benchedHeap heap.Map[ids.NodeID, time.Time]

// A validator will be benched if [threshold] messages in a row
// to them time out and the first of those messages was more than
Expand Down Expand Up @@ -154,11 +111,14 @@ func NewBenchlist(
return nil, fmt.Errorf("max portion of benched stake must be in [0,1) but got %f", maxPortion)
}
benchlist := &benchlist{
chainID: chainID,
log: log,
failureStreaks: make(map[ids.NodeID]failureStreak),
benchlistSet: set.Set[ids.NodeID]{},
benchable: benchable,
chainID: chainID,
log: log,
failureStreaks: make(map[ids.NodeID]failureStreak),
benchlistSet: set.Set[ids.NodeID]{},
benchable: benchable,
benchedHeap: heap.NewMap[ids.NodeID, time.Time](func(a, b time.Time) bool {
return a.Before(b)
}),
vdrs: validators,
threshold: threshold,
minimumFailingDuration: minimumFailingDuration,
Expand All @@ -177,60 +137,53 @@ func (b *benchlist) update() {

now := b.clock.Time()
for {
// [next] is nil when no more validators should
// leave the bench at this time
next := b.nextToLeave(now)
if next == nil {
if !b.canUnbench(now) {
break
}
b.remove(next)
b.remove()
}
// Set next time update will be called
b.setNextLeaveTime()
}

// Remove [validator] from the benchlist
// Removes the next node from the benchlist
// Assumes [b.lock] is held
func (b *benchlist) remove(node *benchData) {
// Update state
id := node.nodeID
func (b *benchlist) remove() {
nodeID, _, _ := b.benchedHeap.Pop()
b.log.Debug("removing node from benchlist",
zap.Stringer("nodeID", id),
zap.Stringer("nodeID", nodeID),
)
heap.Remove(&b.benchedQueue, node.index)
b.benchlistSet.Remove(id)
b.benchable.Unbenched(b.chainID, id)
b.benchlistSet.Remove(nodeID)
b.benchable.Unbenched(b.chainID, nodeID)

// Update metrics
b.metrics.numBenched.Set(float64(b.benchedQueue.Len()))
b.metrics.numBenched.Set(float64(b.benchedHeap.Len()))
benchedStake := b.vdrs.SubsetWeight(b.benchlistSet)
b.metrics.weightBenched.Set(float64(benchedStake))
}

// Returns the next validator that should leave
// the bench at time [now]. nil if no validator should.
// Returns if a validator should leave the bench at time [now].
// False if no validator should.
// Assumes [b.lock] is held
func (b *benchlist) nextToLeave(now time.Time) *benchData {
if b.benchedQueue.Len() == 0 {
return nil
}
next := b.benchedQueue[0]
if now.Before(next.benchedUntil) {
return nil
func (b *benchlist) canUnbench(now time.Time) bool {
if b.benchedHeap.Len() == 0 {
return false
}
return next

_, next, _ := b.benchedHeap.Peek()
return now.After(next)
}

// Set [b.timer] to fire when the next validator should leave the bench
// Assumes [b.lock] is held
func (b *benchlist) setNextLeaveTime() {
if b.benchedQueue.Len() == 0 {
if b.benchedHeap.Len() == 0 {
b.timer.Cancel()
return
}
now := b.clock.Time()
next := b.benchedQueue[0]
nextLeave := next.benchedUntil.Sub(now)
_, next, _ := b.benchedHeap.Peek()
nextLeave := next.Sub(now)
b.timer.SetTimeoutIn(nextLeave)
}

Expand Down Expand Up @@ -336,10 +289,7 @@ func (b *benchlist) bench(nodeID ids.NodeID) {
delete(b.failureStreaks, nodeID)
b.streaklock.Unlock()

heap.Push(
&b.benchedQueue,
&benchData{nodeID: nodeID, benchedUntil: benchedUntil},
)
b.benchedHeap.Push(nodeID, benchedUntil)
b.log.Debug("benching validator after consecutive failed queries",
zap.Stringer("nodeID", nodeID),
zap.Duration("benchDuration", benchedUntil.Sub(now)),
Expand All @@ -350,6 +300,6 @@ func (b *benchlist) bench(nodeID ids.NodeID) {
b.setNextLeaveTime()

// Update metrics
b.metrics.numBenched.Set(float64(b.benchedQueue.Len()))
b.metrics.numBenched.Set(float64(b.benchedHeap.Len()))
b.metrics.weightBenched.Set(float64(newBenchedStake))
}
Loading

0 comments on commit a53f73a

Please sign in to comment.