Skip to content

Commit

Permalink
refactoring(p2p): add fallback solution if queue becomes empty
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Jul 31, 2023
1 parent 6cf3094 commit 5089c00
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 17 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ retract v0.1.0

require (
github.com/celestiaorg/go-libp2p-messenger v0.2.0
github.com/gammazero/deque v0.2.1
github.com/gogo/protobuf v1.3.2
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-datastore v0.6.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
Expand Down
4 changes: 2 additions & 2 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (ex *Exchange[H]) GetRangeByHeight(ctx context.Context, from, amount uint64
if amount > header.MaxRangeRequestSize {
return nil, header.ErrHeadersLimitExceeded
}
session := newSession[H](ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout)
session := newSession[H](ex.ctx, ex.host, ex.peerTracker, ex.trustedPeers(), ex.protocolID, ex.Params.RangeRequestTimeout)
defer session.close()
return session.getRangeByHeight(ctx, from, amount, ex.Params.MaxHeadersPerRangeRequest)
}
Expand All @@ -206,7 +206,7 @@ func (ex *Exchange[H]) GetVerifiedRange(
return make([]H, 0), nil
}
session := newSession[H](
ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, withValidation(from),
ex.ctx, ex.host, ex.peerTracker, ex.trustedPeers(), ex.protocolID, ex.Params.RangeRequestTimeout, withValidation(from),
)
defer session.close()
// we request the next header height that we don't have: `fromHead`+1
Expand Down
56 changes: 45 additions & 11 deletions p2p/peer_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/gammazero/deque"
"github.com/libp2p/go-libp2p/core/peer"
)

Expand Down Expand Up @@ -100,50 +101,83 @@ func (ps *peerStats) Pop() any {
type peerQueue struct {
ctx context.Context

statsLk sync.RWMutex
stats peerStats
peerQueueLk sync.RWMutex

stats peerStats
// capacity specifies how many active peers do we have in the queue.
// It will be decreased if peer will return errors.
capacity int

// deque contains a set of trusted peers that are needed for the fallback solution.
deque *deque.Deque[peer.ID]

havePeer chan struct{}
}

func newPeerQueue(ctx context.Context, stats []*peerStat) *peerQueue {
func newPeerQueue(ctx context.Context, stats []*peerStat, trustedPeers []peer.ID) *peerQueue {
statsCh := make(chan struct{}, len(stats))
pq := &peerQueue{
ctx: ctx,
stats: newPeerStats(),
deque: deque.New[peer.ID](len(trustedPeers)),
havePeer: statsCh,
}
for _, stat := range stats {
pq.push(stat)
}

for _, pID := range trustedPeers {
pq.deque.PushBack(pID)
}
return pq
}

// waitPop pops the peer with the biggest score.
// in case if there are no peer available in current session, it blocks until
// the peer will be pushed in.
func (p *peerQueue) waitPop(ctx context.Context) *peerStat {
// TODO(vgonkivs): implement fallback solution for cases when peer queue is empty.
// As we discussed with @Wondertan there could be 2 possible solutions:
// * use libp2p.Discovery to find new peers outside peerTracker to request headers;
// * implement IWANT/IHAVE messaging system and start requesting ranges from the Peerstore;
if p.empty() {
stat := &peerStat{peerID: p.deque.PopFront()}
p.deque.PushBack(stat.peerID)
return stat
}

select {
case <-ctx.Done():
return &peerStat{}
case <-p.ctx.Done():
return &peerStat{}
case <-p.havePeer:
}
p.statsLk.Lock()
defer p.statsLk.Unlock()
p.peerQueueLk.Lock()
defer p.peerQueueLk.Unlock()
return heap.Pop(&p.stats).(*peerStat)
}

// push adds the peer to the queue.
func (p *peerQueue) push(stat *peerStat) {
p.statsLk.Lock()
if p.empty() {
return
}

p.peerQueueLk.Lock()
heap.Push(&p.stats, stat)
p.statsLk.Unlock()
p.peerQueueLk.Unlock()
// notify that the peer is available in the queue, so it can be popped out
p.havePeer <- struct{}{}
}

func (p *peerQueue) decreaseCapacity() {
p.peerQueueLk.Lock()
defer p.peerQueueLk.Unlock()
if p.capacity == 0 {
return
}
p.capacity--
}

func (p *peerQueue) empty() bool {
p.peerQueueLk.Lock()
defer p.peerQueueLk.Unlock()
return p.capacity == 0
}
6 changes: 3 additions & 3 deletions p2p/peer_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func Test_PeerQueuePopBestPeer(t *testing.T) {
}

// we do not need timeout/cancel functionality here
pQueue := newPeerQueue(context.Background(), peersStat)
pQueue := newPeerQueue(context.Background(), peersStat, []peer.ID{})
for index := 0; index < pQueue.stats.Len(); index++ {
stats := heap.Pop(&pQueue.stats).(*peerStat)
require.Equal(t, stats, wantStat[index])
Expand All @@ -53,7 +53,7 @@ func Test_PeerQueueRemovePeer(t *testing.T) {
}

// we do not need timeout/cancel functionality here
pQueue := newPeerQueue(context.Background(), peersStat)
pQueue := newPeerQueue(context.Background(), peersStat, []peer.ID{})

_ = heap.Pop(&pQueue.stats)
stat := heap.Pop(&pQueue.stats).(*peerStat)
Expand All @@ -62,7 +62,7 @@ func Test_PeerQueueRemovePeer(t *testing.T) {

func Test_StatsUpdateStats(t *testing.T) {
// we do not need timeout/cancel functionality here
pQueue := newPeerQueue(context.Background(), []*peerStat{})
pQueue := newPeerQueue(context.Background(), []*peerStat{}, []peer.ID{})
stat := &peerStat{peerID: "peerID", peerScore: 0}
heap.Push(&pQueue.stats, stat)
testCases := []struct {
Expand Down
5 changes: 4 additions & 1 deletion p2p/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"

"github.com/celestiaorg/go-header"
Expand Down Expand Up @@ -49,6 +50,7 @@ func newSession[H header.Header](
ctx context.Context,
h host.Host,
peerTracker *peerTracker,
trustedPeers []peer.ID,
protocolID protocol.ID,
requestTimeout time.Duration,
options ...option[H],
Expand All @@ -59,7 +61,7 @@ func newSession[H header.Header](
cancel: cancel,
protocolID: protocolID,
host: h,
queue: newPeerQueue(ctx, peerTracker.peers()),
queue: newPeerQueue(ctx, peerTracker.peers(), trustedPeers),
peerTracker: peerTracker,
requestTimeout: requestTimeout,
}
Expand Down Expand Up @@ -162,6 +164,7 @@ func (s *session[H]) doRequest(
h, err := s.processResponse(r)
if err != nil {
logFn := log.Errorw
s.queue.decreaseCapacity()

switch err {
case header.ErrNotFound, errEmptyResponse:
Expand Down
2 changes: 2 additions & 0 deletions p2p/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func Test_Validate(t *testing.T) {
context.Background(),
nil,
&peerTracker{trackedPeers: make(map[peer.ID]*peerStat)},
[]peer.ID{},
"", time.Second,
withValidation(head),
)
Expand All @@ -46,6 +47,7 @@ func Test_ValidateFails(t *testing.T) {
context.Background(),
nil,
&peerTracker{trackedPeers: make(map[peer.ID]*peerStat)},
[]peer.ID{},
"", time.Second,
withValidation(head),
)
Expand Down

0 comments on commit 5089c00

Please sign in to comment.