Skip to content

Commit

Permalink
fix: don't wait for the identify exchange to complete
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Oct 28, 2024
1 parent 138591b commit 1e77f4b
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 150 deletions.
2 changes: 2 additions & 0 deletions db/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var KnownErrors = map[string]string{
"max dial attempts exceeded": models.NetErrorMaxDialAttemptsExceeded,
"host is down": models.NetErrorHostIsDown,
"stream reset": models.NetErrorStreamReset,
"stream closed": models.NetErrorStreamReset,
"failed to negotiate security protocol: EOF": models.NetErrorNegotiateSecurityProtocol, // connect retry logic in discv5 relies on the ": EOF" suffix.
"failed to negotiate stream multiplexer": models.NetErrorNegotiateStreamMultiplexer,
"resource limit exceeded": models.NetErrorResourceLimitExceeded,
Expand Down Expand Up @@ -91,6 +92,7 @@ var knownErrorsPrecedence = []string{
"max dial attempts exceeded",
"host is down",
"stream reset",
"stream closed",
"failed to negotiate security protocol: EOF",
"failed to negotiate stream multiplexer",
"resource limit exceeded",
Expand Down
111 changes: 27 additions & 84 deletions discv5/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -200,47 +201,28 @@ func (c *Crawler) crawlLibp2p(ctx context.Context, pi PeerInfo) chan Libp2pResul
Addrs: sanitizedAddrs,
}

var conn network.Conn
result.ConnectStartTime = time.Now()
result.ConnectError = c.connect(ctx, addrInfo) // use filtered addr list
conn, result.ConnectError = c.connect(ctx, addrInfo) // use filtered addr list
result.ConnectEndTime = time.Now()

// If we could successfully connect to the peer we actually crawl it.
if result.ConnectError == nil {

conns := c.host.Network().ConnsToPeer(pi.ID())

// check if we're connected
if len(conns) == 0 {
// this is a weird behavior I was obesrving. Libp2p reports a
// successful connection establishment but isn't connected right
// after the call returned. This point is not a big problem at this
// point because fetchNeighbors will open the connection again. This
// works more often than not but is still weird. At least keep track
// of these cases.
result.ConnClosedImmediately = true

// try it again one more time
if !c.isIdentified(addrInfo.ID) {
_ = c.connect(ctx, addrInfo)
}
} else if len(conns) == 1 {
// handle happy-path separately
result.Transport = conns[0].ConnState().Transport
} else {
transports := map[string]struct{}{}
for _, conn := range conns {
transports[conn.ConnState().Transport] = struct{}{}
}
// keep track of the transport of the open connection
result.Transport = conn.ConnState().Transport

if len(transports) == 1 {
result.Transport = conns[0].ConnState().Transport
} else if len(transports) != 0 {
result.Transport = "multi"
}
}
// wait for the Identify exchange to complete (no-op if already done)
// the internal timeout is set to 30 s. When crawling we only allow 5s.
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

// wait for the Identify exchange to complete
c.identifyWait(ctx, addrInfo)
select {
case <-timeoutCtx.Done():
// identification timed out.
case <-c.host.IDService().IdentifyWait(conn):
// identification may have succeeded.
}

// Extract information from peer store
ps := c.host.Peerstore()
Expand Down Expand Up @@ -285,9 +267,9 @@ func (c *Crawler) crawlLibp2p(ctx context.Context, pi PeerInfo) chan Libp2pResul
}

// connect establishes a connection to the given peer. It also handles metric capturing.
func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) (network.Conn, error) {
if len(pi.Addrs) == 0 {
return fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
return nil, fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
}

// init an exponential backoff
Expand All @@ -307,15 +289,18 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
})
logEntry.Debugln("Connecting to peer", pi.ID.ShortString())

// save addresses into the peer store temporarily
c.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)

timeoutCtx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout)
err := c.host.Connect(timeoutCtx, pi)
conn, err := c.host.Network().DialPeer(timeoutCtx, pi.ID)
cancel()

if err == nil {
return nil
return conn, nil
}

switch true {
switch {
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionRefused]):
// Might be transient because the remote doesn't want us to connect. Try again!
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionGated]):
Expand All @@ -332,18 +317,18 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
// We already have too many open connections over a relay. Try again!
default:
logEntry.WithError(err).Debugln("Failed connecting to peer", pi.ID.ShortString())
return err
return nil, err
}

sleepDur := bo.NextBackOff()
if sleepDur == backoff.Stop {
logEntry.WithError(err).Debugln("Exceeded retries connecting to peer", pi.ID.ShortString())
return err
return nil, err
}

select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
case <-time.After(sleepDur):
retry += 1
continue
Expand Down Expand Up @@ -410,48 +395,6 @@ func sanitizeAddrs(maddrs []ma.Multiaddr) ([]ma.Multiaddr, bool) {
return maddrs, false
}

// identifyWait waits until any connection to a peer passed the Identify
// exchange successfully or all identification attempts have failed.
// The call to IdentifyWait returns immediately if the connection was
// identified in the past. We detect a successful identification if an
// AgentVersion is stored in the peer store
func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) {
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) // TODO: parameterize
defer cancel()

var wg sync.WaitGroup
for _, conn := range c.host.Network().ConnsToPeer(pi.ID) {
conn := conn

wg.Add(1)
go func() {
defer wg.Done()

select {
case <-timeoutCtx.Done():
case <-c.host.IDService().IdentifyWait(conn):

// check if identification was successful by looking for
// the AgentVersion key. If it exists, we cancel the
// identification of the remaining connections.
if c.isIdentified(pi.ID) {
cancel()
return
}
}
}()
}

wg.Wait()
}

// isIdentified returns true if the given peer.ID was successfully identified.
// Just because IdentifyWait returns doesn't mean the peer was identified.
func (c *Crawler) isIdentified(pid peer.ID) bool {
agent, err := c.host.Peerstore().Get(pid, "AgentVersion")
return err == nil && agent.(string) != ""
}

type DiscV5Result struct {
// The time we received the first successful response
RespondedAt *time.Time
Expand Down
3 changes: 2 additions & 1 deletion discv5/driver_crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,9 @@ func (d *CrawlDriver) Close() {

func newLibp2pHost(version string) (host.Host, error) {
// Configure the resource manager to not limit anything
var noSubnetLimit []rcmgr.ConnLimitPerSubnet
limiter := rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits)
rm, err := rcmgr.NewResourceManager(limiter)
rm, err := rcmgr.NewResourceManager(limiter, rcmgr.WithLimitPerSubnet(noSubnetLimit, noSubnetLimit))
if err != nil {
return nil, fmt.Errorf("new resource manager: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion libp2p/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package libp2p
import (
"context"
"encoding/json"
"strings"
"time"

"github.com/benbjohnson/clock"
Expand Down Expand Up @@ -119,7 +120,7 @@ func mergeResults(r *core.CrawlResult[PeerInfo], p2pRes P2PResult, apiRes APIRes

// treat ErrConnectionClosedImmediately as no error because we were able
// to connect
if p2pRes.ConnClosedImmediately {
if p2pRes.CrawlError != nil && strings.Contains(p2pRes.CrawlError.Error(), "connection failed") {
properties["direct_close"] = true
}

Expand Down
90 changes: 27 additions & 63 deletions libp2p/crawler_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
kbucket "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
Expand Down Expand Up @@ -54,8 +55,8 @@ type P2PResult struct {
// know about that protocol.
ListenAddrs []ma.Multiaddr

// If the connection was closed immediately
ConnClosedImmediately bool
// the transport of a successful connection
Transport string
}

func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult {
Expand All @@ -66,23 +67,16 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult {
RoutingTable: &core.RoutingTable[PeerInfo]{PeerID: pi.ID()},
}

var conn network.Conn
result.ConnectStartTime = time.Now()
result.ConnectError = c.connect(ctx, pi.AddrInfo) // use filtered addr list
conn, result.ConnectError = c.connect(ctx, pi.AddrInfo) // use filtered addr list
result.ConnectEndTime = time.Now()

// If we could successfully connect to the peer we actually crawl it.
if result.ConnectError == nil {

// check if we're actually connected
if c.host.Network().Connectedness(pi.ID()) == network.NotConnected {
// this is a weird behavior I was obesrving. Libp2p reports a
// successful connection establishment but isn't connected right
// after the call returned. This is not a big problem at this
// point because drainBuckets will open the connection again.
// This works more often than not but is still weird. At least
// keep track of this issue - just in case.
result.ConnClosedImmediately = true
}
// keep track of the transport of the open connection
result.Transport = conn.ConnState().Transport

// Fetch all neighbors
result.RoutingTable, result.CrawlError = c.drainBuckets(ctx, pi.AddrInfo)
Expand All @@ -91,7 +85,16 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult {
}

// wait for the Identify exchange to complete (no-op if already done)
c.identifyWait(ctx, pi.AddrInfo)
// the internal timeout is set to 30 s. When crawling we only allow 5s.
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

select {
case <-timeoutCtx.Done():
// identification timed out.
case <-c.host.IDService().IdentifyWait(conn):
// identification may have succeeded.
}

// Extract information from peer store
ps := c.host.Peerstore()
Expand Down Expand Up @@ -136,9 +139,9 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult {
}

// connect establishes a connection to the given peer. It also handles metric capturing.
func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) (network.Conn, error) {
if len(pi.Addrs) == 0 {
return fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
return nil, fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
}

// init an exponential backoff
Expand All @@ -160,13 +163,16 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
})
logEntry.Debugln("Connecting to peer", pi.ID.ShortString())

// save addresses into the peer store temporarily
c.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)

timeoutCtx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout)
err := c.host.Connect(timeoutCtx, pi)
conn, err := c.host.Network().DialPeer(timeoutCtx, pi.ID)
cancel()

// yay, it worked! Or has it? The caller checks the connectedness again.
if err == nil {
return nil
return conn, nil
}

switch true {
Expand All @@ -186,18 +192,18 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
// We already have too many open connections over a relay. Try again!
default:
logEntry.WithError(err).Debugln("Failed connecting to peer", pi.ID.ShortString())
return err
return nil, err
}

sleepDur := bo.NextBackOff()
if sleepDur == backoff.Stop {
logEntry.WithError(err).Debugln("Exceeded retries connecting to peer", pi.ID.ShortString())
return err
return nil, err
}

select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
case <-time.After(sleepDur):
retry += 1
continue
Expand Down Expand Up @@ -302,45 +308,3 @@ func (c *Crawler) drainBucket(ctx context.Context, rt *kbucket.RoutingTable, pid

return nil, fmt.Errorf("getting closest peer with CPL %d: %w", bucket, err)
}

// identifyWait waits until any connection to a peer passed the Identify
// exchange successfully or all identification attempts have failed.
// The call to IdentifyWait returns immediately if the connection was
// identified in the past. We detect a successful identification if an
// AgentVersion is stored in the peer store
func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) {
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

var wg sync.WaitGroup
for _, conn := range c.host.Network().ConnsToPeer(pi.ID) {
conn := conn

wg.Add(1)
go func() {
defer wg.Done()

select {
case <-timeoutCtx.Done():
case <-c.host.IDService().IdentifyWait(conn):

// check if identification was successful by looking for
// the AgentVersion key. If it exists, we cancel the
// identification of the remaining connections.
if c.isIdentified(pi.ID) {
cancel()
return
}
}
}()
}

wg.Wait()
}

// isIdentified returns true if the given peer.ID was successfully identified.
// Just because IdentifyWait returns doesn't mean the peer was identified.
func (c *Crawler) isIdentified(pid peer.ID) bool {
agent, err := c.host.Peerstore().Get(pid, "AgentVersion")
return err == nil && agent.(string) != ""
}
Loading

0 comments on commit 1e77f4b

Please sign in to comment.