diff --git a/db/errors.go b/db/errors.go index 5062b18..e87cd13 100644 --- a/db/errors.go +++ b/db/errors.go @@ -29,6 +29,7 @@ var KnownErrors = map[string]string{ "max dial attempts exceeded": models.NetErrorMaxDialAttemptsExceeded, "host is down": models.NetErrorHostIsDown, "stream reset": models.NetErrorStreamReset, + "stream closed": models.NetErrorStreamReset, "failed to negotiate security protocol: EOF": models.NetErrorNegotiateSecurityProtocol, // connect retry logic in discv5 relies on the ": EOF" suffix. "failed to negotiate stream multiplexer": models.NetErrorNegotiateStreamMultiplexer, "resource limit exceeded": models.NetErrorResourceLimitExceeded, @@ -91,6 +92,7 @@ var knownErrorsPrecedence = []string{ "max dial attempts exceeded", "host is down", "stream reset", + "stream closed", "failed to negotiate security protocol: EOF", "failed to negotiate stream multiplexer", "resource limit exceeded", diff --git a/discv5/crawler.go b/discv5/crawler.go index 037cc4b..ebc000c 100644 --- a/discv5/crawler.go +++ b/discv5/crawler.go @@ -6,13 +6,14 @@ import ( "errors" "fmt" "strings" - "sync" "time" "github.com/cenkalti/backoff/v4" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" basichost "github.com/libp2p/go-libp2p/p2p/host/basic" ma "github.com/multiformats/go-multiaddr" log "github.com/sirupsen/logrus" @@ -200,47 +201,28 @@ func (c *Crawler) crawlLibp2p(ctx context.Context, pi PeerInfo) chan Libp2pResul Addrs: sanitizedAddrs, } + var conn network.Conn result.ConnectStartTime = time.Now() - result.ConnectError = c.connect(ctx, addrInfo) // use filtered addr list + conn, result.ConnectError = c.connect(ctx, addrInfo) // use filtered addr list result.ConnectEndTime = time.Now() // If we could successfully connect to the peer we actually crawl it. if result.ConnectError == nil { - conns := c.host.Network().ConnsToPeer(pi.ID()) - - // check if we're connected - if len(conns) == 0 { - // this is a weird behavior I was obesrving. Libp2p reports a - // successful connection establishment but isn't connected right - // after the call returned. This point is not a big problem at this - // point because fetchNeighbors will open the connection again. This - // works more often than not but is still weird. At least keep track - // of these cases. - result.ConnClosedImmediately = true - - // try it again one more time - if !c.isIdentified(addrInfo.ID) { - _ = c.connect(ctx, addrInfo) - } - } else if len(conns) == 1 { - // handle happy-path separately - result.Transport = conns[0].ConnState().Transport - } else { - transports := map[string]struct{}{} - for _, conn := range conns { - transports[conn.ConnState().Transport] = struct{}{} - } + // keep track of the transport of the open connection + result.Transport = conn.ConnState().Transport - if len(transports) == 1 { - result.Transport = conns[0].ConnState().Transport - } else if len(transports) != 0 { - result.Transport = "multi" - } - } + // wait for the Identify exchange to complete (no-op if already done) + // the internal timeout is set to 30 s. When crawling we only allow 5s. + timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() - // wait for the Identify exchange to complete - c.identifyWait(ctx, addrInfo) + select { + case <-timeoutCtx.Done(): + // identification timed out. + case <-c.host.IDService().IdentifyWait(conn): + // identification may have succeeded. + } // Extract information from peer store ps := c.host.Peerstore() @@ -285,9 +267,9 @@ func (c *Crawler) crawlLibp2p(ctx context.Context, pi PeerInfo) chan Libp2pResul } // connect establishes a connection to the given peer. It also handles metric capturing. -func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error { +func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) (network.Conn, error) { if len(pi.Addrs) == 0 { - return fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg + return nil, fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg } // init an exponential backoff @@ -307,15 +289,18 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error { }) logEntry.Debugln("Connecting to peer", pi.ID.ShortString()) + // save addresses into the peer store temporarily + c.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL) + timeoutCtx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout) - err := c.host.Connect(timeoutCtx, pi) + conn, err := c.host.Network().DialPeer(timeoutCtx, pi.ID) cancel() if err == nil { - return nil + return conn, nil } - switch true { + switch { case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionRefused]): // Might be transient because the remote doesn't want us to connect. Try again! case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionGated]): @@ -332,18 +317,18 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error { // We already have too many open connections over a relay. Try again! default: logEntry.WithError(err).Debugln("Failed connecting to peer", pi.ID.ShortString()) - return err + return nil, err } sleepDur := bo.NextBackOff() if sleepDur == backoff.Stop { logEntry.WithError(err).Debugln("Exceeded retries connecting to peer", pi.ID.ShortString()) - return err + return nil, err } select { case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() case <-time.After(sleepDur): retry += 1 continue @@ -410,48 +395,6 @@ func sanitizeAddrs(maddrs []ma.Multiaddr) ([]ma.Multiaddr, bool) { return maddrs, false } -// identifyWait waits until any connection to a peer passed the Identify -// exchange successfully or all identification attempts have failed. -// The call to IdentifyWait returns immediately if the connection was -// identified in the past. We detect a successful identification if an -// AgentVersion is stored in the peer store -func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) { - timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) // TODO: parameterize - defer cancel() - - var wg sync.WaitGroup - for _, conn := range c.host.Network().ConnsToPeer(pi.ID) { - conn := conn - - wg.Add(1) - go func() { - defer wg.Done() - - select { - case <-timeoutCtx.Done(): - case <-c.host.IDService().IdentifyWait(conn): - - // check if identification was successful by looking for - // the AgentVersion key. If it exists, we cancel the - // identification of the remaining connections. - if c.isIdentified(pi.ID) { - cancel() - return - } - } - }() - } - - wg.Wait() -} - -// isIdentified returns true if the given peer.ID was successfully identified. -// Just because IdentifyWait returns doesn't mean the peer was identified. -func (c *Crawler) isIdentified(pid peer.ID) bool { - agent, err := c.host.Peerstore().Get(pid, "AgentVersion") - return err == nil && agent.(string) != "" -} - type DiscV5Result struct { // The time we received the first successful response RespondedAt *time.Time diff --git a/discv5/driver_crawler.go b/discv5/driver_crawler.go index 703cd13..05b1319 100644 --- a/discv5/driver_crawler.go +++ b/discv5/driver_crawler.go @@ -297,8 +297,9 @@ func (d *CrawlDriver) Close() { func newLibp2pHost(version string) (host.Host, error) { // Configure the resource manager to not limit anything + var noSubnetLimit []rcmgr.ConnLimitPerSubnet limiter := rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits) - rm, err := rcmgr.NewResourceManager(limiter) + rm, err := rcmgr.NewResourceManager(limiter, rcmgr.WithLimitPerSubnet(noSubnetLimit, noSubnetLimit)) if err != nil { return nil, fmt.Errorf("new resource manager: %w", err) } diff --git a/libp2p/crawler.go b/libp2p/crawler.go index b8bb154..5e0f4ea 100644 --- a/libp2p/crawler.go +++ b/libp2p/crawler.go @@ -3,6 +3,7 @@ package libp2p import ( "context" "encoding/json" + "strings" "time" "github.com/benbjohnson/clock" @@ -119,7 +120,7 @@ func mergeResults(r *core.CrawlResult[PeerInfo], p2pRes P2PResult, apiRes APIRes // treat ErrConnectionClosedImmediately as no error because we were able // to connect - if p2pRes.ConnClosedImmediately { + if p2pRes.CrawlError != nil && strings.Contains(p2pRes.CrawlError.Error(), "connection failed") { properties["direct_close"] = true } diff --git a/libp2p/crawler_p2p.go b/libp2p/crawler_p2p.go index 1ab7cc3..d44dbc1 100644 --- a/libp2p/crawler_p2p.go +++ b/libp2p/crawler_p2p.go @@ -11,6 +11,7 @@ import ( kbucket "github.com/libp2p/go-libp2p-kbucket" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" ma "github.com/multiformats/go-multiaddr" log "github.com/sirupsen/logrus" "go.uber.org/atomic" @@ -54,8 +55,8 @@ type P2PResult struct { // know about that protocol. ListenAddrs []ma.Multiaddr - // If the connection was closed immediately - ConnClosedImmediately bool + // the transport of a successful connection + Transport string } func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult { @@ -66,23 +67,16 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult { RoutingTable: &core.RoutingTable[PeerInfo]{PeerID: pi.ID()}, } + var conn network.Conn result.ConnectStartTime = time.Now() - result.ConnectError = c.connect(ctx, pi.AddrInfo) // use filtered addr list + conn, result.ConnectError = c.connect(ctx, pi.AddrInfo) // use filtered addr list result.ConnectEndTime = time.Now() // If we could successfully connect to the peer we actually crawl it. if result.ConnectError == nil { - // check if we're actually connected - if c.host.Network().Connectedness(pi.ID()) == network.NotConnected { - // this is a weird behavior I was obesrving. Libp2p reports a - // successful connection establishment but isn't connected right - // after the call returned. This is not a big problem at this - // point because drainBuckets will open the connection again. - // This works more often than not but is still weird. At least - // keep track of this issue - just in case. - result.ConnClosedImmediately = true - } + // keep track of the transport of the open connection + result.Transport = conn.ConnState().Transport // Fetch all neighbors result.RoutingTable, result.CrawlError = c.drainBuckets(ctx, pi.AddrInfo) @@ -91,7 +85,16 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult { } // wait for the Identify exchange to complete (no-op if already done) - c.identifyWait(ctx, pi.AddrInfo) + // the internal timeout is set to 30 s. When crawling we only allow 5s. + timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + select { + case <-timeoutCtx.Done(): + // identification timed out. + case <-c.host.IDService().IdentifyWait(conn): + // identification may have succeeded. + } // Extract information from peer store ps := c.host.Peerstore() @@ -136,9 +139,9 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult { } // connect establishes a connection to the given peer. It also handles metric capturing. -func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error { +func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) (network.Conn, error) { if len(pi.Addrs) == 0 { - return fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg + return nil, fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg } // init an exponential backoff @@ -160,13 +163,16 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error { }) logEntry.Debugln("Connecting to peer", pi.ID.ShortString()) + // save addresses into the peer store temporarily + c.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL) + timeoutCtx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout) - err := c.host.Connect(timeoutCtx, pi) + conn, err := c.host.Network().DialPeer(timeoutCtx, pi.ID) cancel() // yay, it worked! Or has it? The caller checks the connectedness again. if err == nil { - return nil + return conn, nil } switch true { @@ -186,18 +192,18 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error { // We already have too many open connections over a relay. Try again! default: logEntry.WithError(err).Debugln("Failed connecting to peer", pi.ID.ShortString()) - return err + return nil, err } sleepDur := bo.NextBackOff() if sleepDur == backoff.Stop { logEntry.WithError(err).Debugln("Exceeded retries connecting to peer", pi.ID.ShortString()) - return err + return nil, err } select { case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() case <-time.After(sleepDur): retry += 1 continue @@ -302,45 +308,3 @@ func (c *Crawler) drainBucket(ctx context.Context, rt *kbucket.RoutingTable, pid return nil, fmt.Errorf("getting closest peer with CPL %d: %w", bucket, err) } - -// identifyWait waits until any connection to a peer passed the Identify -// exchange successfully or all identification attempts have failed. -// The call to IdentifyWait returns immediately if the connection was -// identified in the past. We detect a successful identification if an -// AgentVersion is stored in the peer store -func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) { - timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - var wg sync.WaitGroup - for _, conn := range c.host.Network().ConnsToPeer(pi.ID) { - conn := conn - - wg.Add(1) - go func() { - defer wg.Done() - - select { - case <-timeoutCtx.Done(): - case <-c.host.IDService().IdentifyWait(conn): - - // check if identification was successful by looking for - // the AgentVersion key. If it exists, we cancel the - // identification of the remaining connections. - if c.isIdentified(pi.ID) { - cancel() - return - } - } - }() - } - - wg.Wait() -} - -// isIdentified returns true if the given peer.ID was successfully identified. -// Just because IdentifyWait returns doesn't mean the peer was identified. -func (c *Crawler) isIdentified(pid peer.ID) bool { - agent, err := c.host.Peerstore().Get(pid, "AgentVersion") - return err == nil && agent.(string) != "" -} diff --git a/libp2p/driver_crawler.go b/libp2p/driver_crawler.go index e6aa089..c7f47d3 100644 --- a/libp2p/driver_crawler.go +++ b/libp2p/driver_crawler.go @@ -184,8 +184,9 @@ func (d *CrawlDriver) Close() {} func newLibp2pHost(userAgent string) (Host, error) { // Configure the resource manager to not limit anything + var noSubnetLimit []rcmgr.ConnLimitPerSubnet limiter := rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits) - rm, err := rcmgr.NewResourceManager(limiter) + rm, err := rcmgr.NewResourceManager(limiter, rcmgr.WithLimitPerSubnet(noSubnetLimit, noSubnetLimit)) if err != nil { return nil, fmt.Errorf("new resource manager: %w", err) }