Skip to content

Commit

Permalink
WIP wait until we have public addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Oct 22, 2021
1 parent 1f5db5a commit 4036a06
Showing 1 changed file with 61 additions and 8 deletions.
69 changes: 61 additions & 8 deletions p2p/protocol/holepunch/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -56,6 +57,8 @@ type Service struct {
closed bool
refCount sync.WaitGroup

hasPublicAddrsChan chan struct{} // this chan is closed as soon as we have a public address

// active hole punches for deduplicating
activeMx sync.Mutex
active map[peer.ID]struct{}
Expand All @@ -71,11 +74,12 @@ func NewService(h host.Host, ids *identify.IDService, opts ...Option) (*Service,

ctx, cancel := context.WithCancel(context.Background())
hs := &Service{
ctx: ctx,
ctxCancel: cancel,
host: h,
ids: ids,
active: make(map[peer.ID]struct{}),
ctx: ctx,
ctxCancel: cancel,
host: h,
ids: ids,
active: make(map[peer.ID]struct{}),
hasPublicAddrsChan: make(chan struct{}),
}

for _, opt := range opts {
Expand All @@ -85,11 +89,42 @@ func NewService(h host.Host, ids *identify.IDService, opts ...Option) (*Service,
}
}

h.SetStreamHandler(Protocol, hs.handleNewStream)
sub, err := hs.host.EventBus().Subscribe(&event.EvtLocalAddressesUpdated{})
if err != nil {
return nil, err
}
hs.refCount.Add(1)
go hs.watchForPublicAddr(sub)

h.Network().Notify((*netNotifiee)(hs))
return hs, nil
}

func (hs *Service) watchForPublicAddr(sub event.Subscription) {
defer hs.refCount.Done()
defer sub.Close()

log.Debug("waiting until we have at least one public address")

for {
if containsPublicAddr(hs.host.Addrs()) {
log.Debug("Host now has a public address. Starting holepunch protocol.")
hs.host.SetStreamHandler(Protocol, hs.handleNewStream)
close(hs.hasPublicAddrsChan)
return
}

select {
case <-hs.ctx.Done():
return
case _, ok := <-sub.Out():
if !ok {
return
}
}
}
}

// Close closes the Hole Punch Service.
func (hs *Service) Close() error {
hs.closeMx.Lock()
Expand Down Expand Up @@ -176,7 +211,6 @@ func (hs *Service) beginDirectConnect(p peer.ID) error {
// It first attempts a direct dial (if we have a public address of that peer), and then
// coordinates a hole punch over the given relay connection.
func (hs *Service) DirectConnect(p peer.ID) error {
log.Debugw("got inbound proxy conn", "peer", p)
if err := hs.beginDirectConnect(p); err != nil {
return err
}
Expand Down Expand Up @@ -221,8 +255,16 @@ func (hs *Service) directConnect(rp peer.ID) error {
}
}

if len(hs.ids.OwnObservedAddrs()) == 0 {
log.Debugw("got inbound proxy conn", "peer", rp)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
select {
case <-hs.ctx.Done():
return hs.ctx.Err()
case <-ctx.Done():
log.Debug("didn't find any public host address")
return errors.New("can't initiate hole punch, as we don't have any public addresses")
case <-hs.hasPublicAddrsChan:
}

// hole punch
Expand Down Expand Up @@ -363,6 +405,16 @@ func (hs *Service) holePunchConnect(pi peer.AddrInfo, isClient bool) error {
return nil
}

func containsPublicAddr(addrs []ma.Multiaddr) bool {
for _, addr := range addrs {
if isRelayAddress(addr) || !manet.IsPublicAddr(addr) {
continue
}
return true
}
return false
}

func removeRelayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
result := make([]ma.Multiaddr, 0, len(addrs))
for _, addr := range addrs {
Expand Down Expand Up @@ -414,6 +466,7 @@ func (nn *netNotifiee) Connected(_ network.Network, conn network.Conn) {
// that we can dial to for a hole punch.
case <-hs.ids.IdentifyWait(conn):
case <-hs.ctx.Done():
return
}

_ = hs.DirectConnect(conn.RemotePeer())
Expand Down

0 comments on commit 4036a06

Please sign in to comment.