Skip to content

Commit

Permalink
wait and then multiple pops
Browse files Browse the repository at this point in the history
  • Loading branch information
igorcrevar committed Jul 10, 2023
1 parent 22dd383 commit f98cd96
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 42 deletions.
30 changes: 14 additions & 16 deletions network/dial/dial_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package dial

import (
"container/heap"
"context"
"sync"

"github.com/0xPolygon/polygon-edge/network/common"

"github.com/libp2p/go-libp2p/core/peer"
)

const updateChBufferSize = 20

// DialQueue is a queue that holds dials tasks for potential peers, implemented as a min-heap
type DialQueue struct {
sync.Mutex
Expand All @@ -27,7 +26,7 @@ func NewDialQueue() *DialQueue {
return &DialQueue{
heap: dialQueueImpl{},
tasks: map[peer.ID]*DialTask{},
updateCh: make(chan struct{}, updateChBufferSize),
updateCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
}
}
Expand All @@ -37,22 +36,21 @@ func (d *DialQueue) Close() {
close(d.closeCh)
}

// PopTask is a loop that handles update and close events [BLOCKING]
func (d *DialQueue) PopTask() *DialTask {
for {
select {
case <-d.updateCh: // blocks until AddTask is called...
if task := d.popTaskImpl(); task != nil {
return task
}
case <-d.closeCh: // ... or dial queue is closed
return nil
}
// Wait waits for closing or updating event or end of context.
// Returns true for closing event or end of the context [BLOCKING].
func (d *DialQueue) Wait(ctx context.Context) bool {
select {
case <-ctx.Done(): // blocks until context is done ...
return true
case <-d.updateCh: // ... or AddTask is called ...
return false
case <-d.closeCh: // ... or dial queue is closed
return true
}
}

// popTaskImpl is the implementation for task popping from the min-heap
func (d *DialQueue) popTaskImpl() *DialTask {
// PopTask is the implementation for task popping from the min-heap
func (d *DialQueue) PopTask() *DialTask {
d.Lock()
defer d.Unlock()

Expand Down
15 changes: 10 additions & 5 deletions network/dial/dial_queue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dial

import (
"context"
"testing"
"time"

Expand All @@ -11,6 +12,9 @@ import (
func TestDialQueue(t *testing.T) {
q := NewDialQueue()
infos := [3]*peer.AddrInfo{}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)

defer cancel()

for i, x := range []string{"a", "b", "c"} {
infos[i] = &peer.AddrInfo{
Expand All @@ -32,17 +36,18 @@ func TestDialQueue(t *testing.T) {
q.AddTask(infos[2], 1) // existing task, more priority
assert.Equal(t, 3, q.heap.Len())

assert.Equal(t, peer.ID("b"), q.popTaskImpl().addrInfo.ID)
assert.Equal(t, peer.ID("c"), q.popTaskImpl().addrInfo.ID)
assert.Equal(t, peer.ID("a"), q.popTaskImpl().addrInfo.ID)
assert.Equal(t, peer.ID("b"), q.PopTask().addrInfo.ID)
assert.Equal(t, peer.ID("c"), q.PopTask().addrInfo.ID)
assert.Equal(t, peer.ID("a"), q.PopTask().addrInfo.ID)
assert.Equal(t, 0, q.heap.Len())

assert.Nil(t, q.popTaskImpl())
assert.Nil(t, q.PopTask())

done := make(chan struct{})

go func() {
q.PopTask()
q.Wait(ctx) // wait for first update
q.Wait(ctx) // wait for second update (line 61)
done <- struct{}{}
}()

Expand Down
47 changes: 26 additions & 21 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,36 +381,41 @@ func (s *Server) runDial() {
}

for {
tt := s.dialQueue.PopTask()
if tt == nil {
// The dial queue is closed,
// no further dial tasks are incoming
if closed := s.dialQueue.Wait(ctx); closed {
// The dial queue is closed, no further dial tasks are incoming
return
}

peerInfo := tt.GetAddrInfo()
for {
tt := s.dialQueue.PopTask()
if tt == nil {
break
}

if s.IsConnected(peerInfo.ID) {
continue
}
peerInfo := tt.GetAddrInfo()

s.logger.Debug("Waiting for a dialing slot", "addr", peerInfo, "local", s.host.ID())
if s.IsConnected(peerInfo.ID) {
continue
}

if closed := slots.Take(ctx); closed {
return
}
s.logger.Debug("Waiting for a dialing slot", "addr", peerInfo, "local", s.host.ID())

// the connection process is async because it involves connection (here) +
// the handshake done in the identity service.
go func() {
s.logger.Debug("Dialing peer", "addr", peerInfo, "local", s.host.ID())
if closed := slots.Take(ctx); closed {
return
}

if err := s.host.Connect(ctx, *peerInfo); err != nil {
s.logger.Debug("failed to dial", "addr", peerInfo, "err", err.Error())
// the connection process is async because it involves connection (here) +
// the handshake done in the identity service.
go func() {
s.logger.Debug("Dialing peer", "addr", peerInfo, "local", s.host.ID())

s.emitEvent(peerInfo.ID, peerEvent.PeerFailedToConnect)
}
}()
if err := s.host.Connect(ctx, *peerInfo); err != nil {
s.logger.Debug("failed to dial", "addr", peerInfo, "err", err.Error())

s.emitEvent(peerInfo.ID, peerEvent.PeerFailedToConnect)
}
}()
}
}
}

Expand Down

0 comments on commit f98cd96

Please sign in to comment.