Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reject hole punching attempts when we don't have any public addresses #1214

Merged
merged 3 commits into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type BasicHost struct {

network network.Network
mux *msmux.MultistreamMuxer
ids *identify.IDService
ids identify.IDService
hps *holepunch.Service
pings *ping.PingService
natmgr NATManager
Expand Down Expand Up @@ -542,7 +542,7 @@ func (h *BasicHost) Mux() protocol.Switch {
}

// IDService returns
func (h *BasicHost) IDService() *identify.IDService {
func (h *BasicHost) IDService() identify.IDService {
return h.ids
}

Expand Down
114 changes: 95 additions & 19 deletions p2p/protocol/holepunch/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Service struct {
ctx context.Context
ctxCancel context.CancelFunc

ids *identify.IDService
ids identify.IDService
host host.Host

tracer *tracer
Expand All @@ -56,6 +56,8 @@ type Service struct {
closed bool
refCount sync.WaitGroup

hasPublicAddrsChan chan struct{} // this chan is closed as soon as we have a public address

// active hole punches for deduplicating
activeMx sync.Mutex
active map[peer.ID]struct{}
Expand All @@ -64,18 +66,19 @@ type Service struct {
type Option func(*Service) error

// NewService creates a new service that can be used for hole punching
func NewService(h host.Host, ids *identify.IDService, opts ...Option) (*Service, error) {
func NewService(h host.Host, ids identify.IDService, opts ...Option) (*Service, error) {
if ids == nil {
return nil, errors.New("identify service can't be nil")
}

ctx, cancel := context.WithCancel(context.Background())
hs := &Service{
ctx: ctx,
ctxCancel: cancel,
host: h,
ids: ids,
active: make(map[peer.ID]struct{}),
ctx: ctx,
ctxCancel: cancel,
host: h,
ids: ids,
active: make(map[peer.ID]struct{}),
hasPublicAddrsChan: make(chan struct{}),
}

for _, opt := range opts {
Expand All @@ -85,11 +88,47 @@ func NewService(h host.Host, ids *identify.IDService, opts ...Option) (*Service,
}
}

h.SetStreamHandler(Protocol, hs.handleNewStream)
hs.refCount.Add(1)
go hs.watchForPublicAddr()

h.Network().Notify((*netNotifiee)(hs))
return hs, nil
}

func (hs *Service) watchForPublicAddr() {
defer hs.refCount.Done()

log.Debug("waiting until we have at least one public address", "peer", hs.host.ID())

// TODO: We should have an event here that fires when identify discovers a new
// address (and when autonat confirms that address).
// As we currently don't have an event like this, just check our observed addresses
// regularly (exponential backoff starting at 250 ms, capped at 5s).
duration := 250 * time.Millisecond
const maxDuration = 5 * time.Second
t := time.NewTimer(duration)
defer t.Stop()
for {
if containsPublicAddr(hs.ids.OwnObservedAddrs()) {
log.Debug("Host now has a public address. Starting holepunch protocol.")
hs.host.SetStreamHandler(Protocol, hs.handleNewStream)
close(hs.hasPublicAddrsChan)
return
}

select {
case <-hs.ctx.Done():
return
case <-t.C:
duration *= 2
if duration > maxDuration {
duration = maxDuration
}
t.Reset(duration)
}
}
}

// Close closes the Hole Punch Service.
func (hs *Service) Close() error {
hs.closeMx.Lock()
Expand Down Expand Up @@ -119,7 +158,7 @@ func (hs *Service) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, time.Duration,
// send a CONNECT and start RTT measurement.
msg := &pb.HolePunch{
Type: pb.HolePunch_CONNECT.Enum(),
ObsAddrs: addrsToBytes(hs.ids.OwnObservedAddrs()),
ObsAddrs: addrsToBytes(removeRelayAddrs(hs.ids.OwnObservedAddrs())),
}

start := time.Now()
Expand All @@ -141,8 +180,10 @@ func (hs *Service) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, time.Duration,
str.Reset()
return nil, 0, fmt.Errorf("expect CONNECT message, got %s", t)
}

addrs := addrsFromBytes(msg.ObsAddrs)
addrs := removeRelayAddrs(addrsFromBytes(msg.ObsAddrs))
if len(addrs) == 0 {
str.Reset()
}

msg.Reset()
msg.Type = pb.HolePunch_SYNC.Enum()
Expand Down Expand Up @@ -174,7 +215,6 @@ func (hs *Service) beginDirectConnect(p peer.ID) error {
// It first attempts a direct dial (if we have a public address of that peer), and then
// coordinates a hole punch over the given relay connection.
func (hs *Service) DirectConnect(p peer.ID) error {
log.Debugw("got inbound proxy conn", "peer", p)
if err := hs.beginDirectConnect(p); err != nil {
return err
}
Expand Down Expand Up @@ -219,6 +259,18 @@ func (hs *Service) directConnect(rp peer.ID) error {
}
}

log.Debugw("got inbound proxy conn", "peer", rp)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
select {
case <-hs.ctx.Done():
return hs.ctx.Err()
case <-ctx.Done():
log.Debug("didn't find any public host address")
return errors.New("can't initiate hole punch, as we don't have any public addresses")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make this a named error, we use it in a few places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't have to be public.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep them different for now. This will make debugging easier.

case <-hs.hasPublicAddrsChan:
}

// hole punch
for i := 0; i < maxRetries; i++ {
addrs, rtt, err := hs.initiateHolePunch(rp)
Expand Down Expand Up @@ -260,6 +312,11 @@ func (hs *Service) incomingHolePunch(s network.Stream) (rtt time.Duration, addrs
if !isRelayAddress(s.Conn().RemoteMultiaddr()) {
return 0, nil, fmt.Errorf("received hole punch stream: %s", s.Conn().RemoteMultiaddr())
}
ownAddrs := removeRelayAddrs(hs.ids.OwnObservedAddrs())
// If we can't tell the peer where to dial us, there's no point in starting the hole punching.
if len(ownAddrs) == 0 {
return 0, nil, errors.New("rejecting hole punch request, as we don't have any public addresses")
}

s.SetDeadline(time.Now().Add(StreamTimeout))
wr := protoio.NewDelimitedWriter(s)
Expand All @@ -273,13 +330,16 @@ func (hs *Service) incomingHolePunch(s network.Stream) (rtt time.Duration, addrs
if t := msg.GetType(); t != pb.HolePunch_CONNECT {
return 0, nil, fmt.Errorf("expected CONNECT message from initiator but got %d", t)
}
obsDial := addrsFromBytes(msg.ObsAddrs)
obsDial := removeRelayAddrs(addrsFromBytes(msg.ObsAddrs))
log.Debugw("received hole punch request", "peer", s.Conn().RemotePeer(), "addrs", obsDial)
if len(obsDial) == 0 {
return 0, nil, errors.New("expected CONNECT message to contain at least one address")
}

// Write CONNECT message
msg.Reset()
msg.Type = pb.HolePunch_CONNECT.Enum()
msg.ObsAddrs = addrsToBytes(hs.ids.OwnObservedAddrs())
msg.ObsAddrs = addrsToBytes(ownAddrs)
tstart := time.Now()
if err := wr.WriteMsg(msg); err != nil {
return 0, nil, fmt.Errorf("failed to write CONNECT message to initator: %w", err)
Expand Down Expand Up @@ -327,11 +387,6 @@ func (hs *Service) handleNewStream(s network.Stream) {
err = hs.holePunchConnect(pi, false)
dt := time.Since(start)
hs.tracer.EndHolePunch(rp, dt, err)
if err != nil {
log.Debugw("hole punching failed", "peer", rp, "time", dt, "error", err)
} else {
log.Debugw("hole punching succeeded", "peer", rp, "time", dt)
}
}

func (hs *Service) holePunchConnect(pi peer.AddrInfo, isClient bool) error {
Expand All @@ -349,6 +404,26 @@ func (hs *Service) holePunchConnect(pi peer.AddrInfo, isClient bool) error {
return nil
}

func containsPublicAddr(addrs []ma.Multiaddr) bool {
for _, addr := range addrs {
if isRelayAddress(addr) || !manet.IsPublicAddr(addr) {
continue
}
return true
}
return false
}

func removeRelayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
result := make([]ma.Multiaddr, 0, len(addrs))
for _, addr := range addrs {
if !isRelayAddress(addr) {
result = append(result, addr)
}
}
return result
}

func isRelayAddress(a ma.Multiaddr) bool {
_, err := a.ValueForProtocol(ma.P_CIRCUIT)
return err == nil
Expand Down Expand Up @@ -390,6 +465,7 @@ func (nn *netNotifiee) Connected(_ network.Network, conn network.Conn) {
// that we can dial to for a hole punch.
case <-hs.ids.IdentifyWait(conn):
case <-hs.ctx.Done():
return
}

_ = hs.DirectConnect(conn.RemotePeer())
Expand Down
Loading