diff --git a/go.mod b/go.mod index 9ffb04a4..457b9e04 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ retract v0.1.0 require ( github.com/celestiaorg/go-libp2p-messenger v0.2.0 + github.com/gammazero/deque v0.2.1 github.com/gogo/protobuf v1.3.2 github.com/hashicorp/golang-lru v0.5.4 github.com/ipfs/go-datastore v0.6.0 diff --git a/go.sum b/go.sum index 28ee7641..7336f067 100644 --- a/go.sum +++ b/go.sum @@ -179,6 +179,8 @@ github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= +github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0= +github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= diff --git a/p2p/exchange.go b/p2p/exchange.go index 99d7f87c..1cad250b 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -190,7 +190,7 @@ func (ex *Exchange[H]) GetRangeByHeight(ctx context.Context, from, amount uint64 if amount > header.MaxRangeRequestSize { return nil, header.ErrHeadersLimitExceeded } - session := newSession[H](ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout) + session := newSession[H](ex.ctx, ex.host, ex.peerTracker, ex.trustedPeers(), ex.protocolID, ex.Params.RangeRequestTimeout) defer session.close() return session.getRangeByHeight(ctx, from, amount, ex.Params.MaxHeadersPerRangeRequest) } @@ -206,7 +206,7 @@ func (ex *Exchange[H]) GetVerifiedRange( return make([]H, 0), nil } session := newSession[H]( - ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, withValidation(from), + ex.ctx, ex.host, ex.peerTracker, ex.trustedPeers(), ex.protocolID, ex.Params.RangeRequestTimeout, withValidation(from), ) defer session.close() // we request the next header height that we don't have: `fromHead`+1 diff --git a/p2p/peer_stats.go b/p2p/peer_stats.go index ced05929..e427afd5 100644 --- a/p2p/peer_stats.go +++ b/p2p/peer_stats.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/gammazero/deque" "github.com/libp2p/go-libp2p/core/peer" ) @@ -100,22 +101,34 @@ func (ps *peerStats) Pop() any { type peerQueue struct { ctx context.Context - statsLk sync.RWMutex - stats peerStats + peerQueueLk sync.RWMutex + + stats peerStats + // capacity specifies how many active peers do we have in the queue. + // It will be decreased if peer will return errors. + capacity int + + // deque contains a set of trusted peers that are needed for the fallback solution. + deque *deque.Deque[peer.ID] havePeer chan struct{} } -func newPeerQueue(ctx context.Context, stats []*peerStat) *peerQueue { +func newPeerQueue(ctx context.Context, stats []*peerStat, trustedPeers []peer.ID) *peerQueue { statsCh := make(chan struct{}, len(stats)) pq := &peerQueue{ ctx: ctx, stats: newPeerStats(), + deque: deque.New[peer.ID](len(trustedPeers)), havePeer: statsCh, } for _, stat := range stats { pq.push(stat) } + + for _, pID := range trustedPeers { + pq.deque.PushBack(pID) + } return pq } @@ -123,10 +136,12 @@ func newPeerQueue(ctx context.Context, stats []*peerStat) *peerQueue { // in case if there are no peer available in current session, it blocks until // the peer will be pushed in. func (p *peerQueue) waitPop(ctx context.Context) *peerStat { - // TODO(vgonkivs): implement fallback solution for cases when peer queue is empty. - // As we discussed with @Wondertan there could be 2 possible solutions: - // * use libp2p.Discovery to find new peers outside peerTracker to request headers; - // * implement IWANT/IHAVE messaging system and start requesting ranges from the Peerstore; + if p.empty() { + stat := &peerStat{peerID: p.deque.PopFront()} + p.deque.PushBack(stat.peerID) + return stat + } + select { case <-ctx.Done(): return &peerStat{} @@ -134,16 +149,35 @@ func (p *peerQueue) waitPop(ctx context.Context) *peerStat { return &peerStat{} case <-p.havePeer: } - p.statsLk.Lock() - defer p.statsLk.Unlock() + p.peerQueueLk.Lock() + defer p.peerQueueLk.Unlock() return heap.Pop(&p.stats).(*peerStat) } // push adds the peer to the queue. func (p *peerQueue) push(stat *peerStat) { - p.statsLk.Lock() + if p.empty() { + return + } + + p.peerQueueLk.Lock() heap.Push(&p.stats, stat) - p.statsLk.Unlock() + p.peerQueueLk.Unlock() // notify that the peer is available in the queue, so it can be popped out p.havePeer <- struct{}{} } + +func (p *peerQueue) decreaseCapacity() { + p.peerQueueLk.Lock() + defer p.peerQueueLk.Unlock() + if p.capacity == 0 { + return + } + p.capacity-- +} + +func (p *peerQueue) empty() bool { + p.peerQueueLk.Lock() + defer p.peerQueueLk.Unlock() + return p.capacity == 0 +} diff --git a/p2p/peer_stats_test.go b/p2p/peer_stats_test.go index d5a875d1..db5dab85 100644 --- a/p2p/peer_stats_test.go +++ b/p2p/peer_stats_test.go @@ -37,7 +37,7 @@ func Test_PeerQueuePopBestPeer(t *testing.T) { } // we do not need timeout/cancel functionality here - pQueue := newPeerQueue(context.Background(), peersStat) + pQueue := newPeerQueue(context.Background(), peersStat, []peer.ID{}) for index := 0; index < pQueue.stats.Len(); index++ { stats := heap.Pop(&pQueue.stats).(*peerStat) require.Equal(t, stats, wantStat[index]) @@ -53,7 +53,7 @@ func Test_PeerQueueRemovePeer(t *testing.T) { } // we do not need timeout/cancel functionality here - pQueue := newPeerQueue(context.Background(), peersStat) + pQueue := newPeerQueue(context.Background(), peersStat, []peer.ID{}) _ = heap.Pop(&pQueue.stats) stat := heap.Pop(&pQueue.stats).(*peerStat) @@ -62,7 +62,7 @@ func Test_PeerQueueRemovePeer(t *testing.T) { func Test_StatsUpdateStats(t *testing.T) { // we do not need timeout/cancel functionality here - pQueue := newPeerQueue(context.Background(), []*peerStat{}) + pQueue := newPeerQueue(context.Background(), []*peerStat{}, []peer.ID{}) stat := &peerStat{peerID: "peerID", peerScore: 0} heap.Push(&pQueue.stats, stat) testCases := []struct { diff --git a/p2p/session.go b/p2p/session.go index 38107e9b..40dadb8e 100644 --- a/p2p/session.go +++ b/p2p/session.go @@ -8,6 +8,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "github.com/celestiaorg/go-header" @@ -49,6 +50,7 @@ func newSession[H header.Header]( ctx context.Context, h host.Host, peerTracker *peerTracker, + trustedPeers []peer.ID, protocolID protocol.ID, requestTimeout time.Duration, options ...option[H], @@ -59,7 +61,7 @@ func newSession[H header.Header]( cancel: cancel, protocolID: protocolID, host: h, - queue: newPeerQueue(ctx, peerTracker.peers()), + queue: newPeerQueue(ctx, peerTracker.peers(), trustedPeers), peerTracker: peerTracker, requestTimeout: requestTimeout, } @@ -162,6 +164,7 @@ func (s *session[H]) doRequest( h, err := s.processResponse(r) if err != nil { logFn := log.Errorw + s.queue.decreaseCapacity() switch err { case header.ErrNotFound, errEmptyResponse: diff --git a/p2p/session_test.go b/p2p/session_test.go index 6d684e0a..25f21a57 100644 --- a/p2p/session_test.go +++ b/p2p/session_test.go @@ -29,6 +29,7 @@ func Test_Validate(t *testing.T) { context.Background(), nil, &peerTracker{trackedPeers: make(map[peer.ID]*peerStat)}, + []peer.ID{}, "", time.Second, withValidation(head), ) @@ -46,6 +47,7 @@ func Test_ValidateFails(t *testing.T) { context.Background(), nil, &peerTracker{trackedPeers: make(map[peer.ID]*peerStat)}, + []peer.ID{}, "", time.Second, withValidation(head), )