Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Improve peer manager performance #395

Merged
merged 3 commits into from
May 21, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 6 additions & 16 deletions internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ func (pm *PeerManager) Connected(p peer.ID) {
pq := pm.getOrCreate(p)

// Inform the peer want manager that there's a new peer
wants := pm.pwm.addPeer(p)
// Broadcast any live want-haves to the newly connected peers
pq.AddBroadcastWantHaves(wants)
pm.pwm.addPeer(pq, p)

// Inform the sessions that the peer has connected
pm.signalAvailability(p, true)
}
Expand Down Expand Up @@ -138,11 +137,7 @@ func (pm *PeerManager) BroadcastWantHaves(ctx context.Context, wantHaves []cid.C
pm.pqLk.Lock()
defer pm.pqLk.Unlock()

for p, ks := range pm.pwm.prepareBroadcastWantHaves(wantHaves) {
if pq, ok := pm.peerQueues[p]; ok {
pq.AddBroadcastWantHaves(ks)
}
}
pm.pwm.broadcastWantHaves(wantHaves)
}

// SendWants sends the given want-blocks and want-haves to the given peer.
Expand All @@ -151,9 +146,8 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci
pm.pqLk.Lock()
defer pm.pqLk.Unlock()

if pq, ok := pm.peerQueues[p]; ok {
wblks, whvs := pm.pwm.prepareSendWants(p, wantBlocks, wantHaves)
pq.AddWants(wblks, whvs)
if _, ok := pm.peerQueues[p]; ok {
pm.pwm.sendWants(p, wantBlocks, wantHaves)
}
}

Expand All @@ -164,11 +158,7 @@ func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) {
defer pm.pqLk.Unlock()

// Send a CANCEL to each peer that has been sent a want-block or want-have
for p, ks := range pm.pwm.prepareSendCancels(cancelKs) {
if pq, ok := pm.peerQueues[p]; ok {
pq.AddCancels(ks)
}
}
pm.pwm.sendCancels(cancelKs)
}

// CurrentWants returns the list of pending wants (both want-haves and want-blocks).
Expand Down
59 changes: 59 additions & 0 deletions internal/peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package peermanager

import (
"context"
"math/rand"
"testing"
"time"

Expand Down Expand Up @@ -318,3 +319,61 @@ func TestSessionRegistration(t *testing.T) {
t.Fatal("Expected no signal callback (session unregistered)")
}
}

type benchPeerQueue struct {
}

func (*benchPeerQueue) Startup() {}
func (*benchPeerQueue) Shutdown() {}

func (*benchPeerQueue) AddBroadcastWantHaves(whs []cid.Cid) {}
func (*benchPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) {}
func (*benchPeerQueue) AddCancels(cs []cid.Cid) {}
func (*benchPeerQueue) ResponseReceived(ks []cid.Cid) {}

// Simplistic benchmark to allow us to stress test
func BenchmarkPeerManager(b *testing.B) {
b.StopTimer()

ctx := context.Background()

peerQueueFactory := func(ctx context.Context, p peer.ID) PeerQueue {
return &benchPeerQueue{}
}

self := testutil.GeneratePeers(1)[0]
peers := testutil.GeneratePeers(500)
peerManager := New(ctx, peerQueueFactory, self)

// Create a bunch of connections
connected := 0
for i := 0; i < len(peers); i++ {
peerManager.Connected(peers[i])
connected++
}

var wanted []cid.Cid

b.StartTimer()
for n := 0; n < b.N; n++ {
// Pick a random peer
i := rand.Intn(connected)

// Alternately add either a few wants or many broadcast wants
r := rand.Intn(8)
if r == 0 {
wants := testutil.GenerateCids(10)
peerManager.SendWants(ctx, peers[i], wants[:2], wants[2:])
wanted = append(wanted, wants...)
} else if r == 1 {
wants := testutil.GenerateCids(30)
peerManager.BroadcastWantHaves(ctx, wants)
wanted = append(wanted, wants...)
} else {
limit := len(wanted) / 10
cancel := wanted[:limit]
wanted = wanted[limit:]
peerManager.SendCancels(ctx, cancel)
}
}
}
Loading