diff --git a/query/peer_rank.go b/query/peer_rank.go index 77c6c8f4..995a15ba 100644 --- a/query/peer_rank.go +++ b/query/peer_rank.go @@ -91,3 +91,13 @@ func (p *peerRanking) Reward(peer string) { p.rank[peer] = score - 1 } + +// ResetRanking sets the score of the passed peer to the defaultScore. +func (p *peerRanking) ResetRanking(peer string) { + _, ok := p.rank[peer] + if !ok { + return + } + + p.rank[peer] = defaultScore +} diff --git a/query/peer_rank_test.go b/query/peer_rank_test.go index 17770e37..bb0f3d2a 100644 --- a/query/peer_rank_test.go +++ b/query/peer_rank_test.go @@ -46,14 +46,35 @@ func TestPeerRank(t *testing.T) { } } - // Lastly, reward the lowest scored one a bunch, which should move it + // This is the lowest scored peer after punishment. + const lowestScoredPeer = "peer0" + + // Reward the lowest scored one a bunch, which should move it // to the front. for i := 0; i < 10; i++ { - ranking.Reward("peer0") + ranking.Reward(lowestScoredPeer) } ranking.Order(peers) - if peers[0] != "peer0" { + if peers[0] != lowestScoredPeer { t.Fatalf("peer0 was not first") } + + // Punish the peer a bunch to make it the lowest scored one. + for i := 0; i < 10; i++ { + ranking.Punish(lowestScoredPeer) + } + + ranking.Order(peers) + if peers[len(peers)-1] != lowestScoredPeer { + t.Fatalf("peer0 should be last") + } + + // Reset its ranking. It should have the default score now + // and should not be the lowest ranked peer. + ranking.ResetRanking(lowestScoredPeer) + ranking.Order(peers) + if peers[len(peers)-1] == lowestScoredPeer { + t.Fatalf("peer0 should not be last.") + } } diff --git a/query/workmanager.go b/query/workmanager.go index e99f57ab..9217f49a 100644 --- a/query/workmanager.go +++ b/query/workmanager.go @@ -65,8 +65,11 @@ type PeerRanking interface { // queries. Punish(peer string) - // Order sorst the slice of peers according to their ranking. + // Order sorts the slice of peers according to their ranking. Order(peers []string) + + // ResetRanking sets the score of the passed peer to the defaultScore. + ResetRanking(peerAddr string) } // activeWorker wraps a Worker that is currently running, together with the job @@ -377,6 +380,11 @@ Loop: result.job.timeout = newTimeout } + // Refresh peer rank on disconnect. + if result.err == ErrPeerDisconnected { + w.cfg.Ranking.ResetRanking(result.peer.Addr()) + } + heap.Push(work, result.job) currentQueries[result.job.index] = batchNum diff --git a/query/workmanager_test.go b/query/workmanager_test.go index b7bec809..d7558a21 100644 --- a/query/workmanager_test.go +++ b/query/workmanager_test.go @@ -63,6 +63,9 @@ func (p *mockPeerRanking) Punish(peer string) { func (p *mockPeerRanking) Reward(peer string) { } +func (p *mockPeerRanking) ResetRanking(peer string) { +} + // startWorkManager starts a new workmanager with the given number of mock // workers. func startWorkManager(t *testing.T, numWorkers int) (WorkManager,