Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node Explorer #63

Merged
merged 9 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Types of changes (Stanzas):
### Features

- [server] [#54](https://github.com/cosmos/atlas/pull/54) Introduce Tendermint node crawling functionality.
- [webapp] [#63](https://github.com/cosmos/atlas/pull/63) Introduce the node explorer into the UI.

### Fixed

Expand Down
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/BurntSushi/toml v0.3.1
github.com/InVisionApp/go-health/v2 v2.1.2
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/dghubble/gologin/v2 v2.2.0
github.com/fatih/color v1.9.0
github.com/go-openapi/spec v0.20.2 // indirect
Expand All @@ -23,17 +22,16 @@ require (
github.com/knadh/koanf v0.13.0
github.com/lib/pq v1.8.0
github.com/microcosm-cc/bluemonday v1.0.4
github.com/onsi/ginkgo v1.14.0 // indirect
github.com/rs/cors v1.7.0
github.com/rs/zerolog v1.20.0
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/satori/go.uuid v1.2.0
github.com/sendgrid/rest v2.6.2+incompatible // indirect
github.com/sendgrid/sendgrid-go v3.7.0+incompatible
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.7.0
github.com/swaggo/http-swagger v0.0.0-20200308142732-58ac5e232fba
github.com/swaggo/swag v1.7.0
github.com/tendermint/tendermint v0.32.8
github.com/tendermint/tendermint v0.34.7
github.com/urfave/cli/v2 v2.3.0
golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43
golang.org/x/sys v0.0.0-20210123231150-1d476976d117 // indirect
Expand Down
375 changes: 336 additions & 39 deletions go.sum

Large diffs are not rendered by default.

195 changes: 135 additions & 60 deletions server/crawl/node_crawler.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package crawl

import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/harwoeck/ipstack"
Expand All @@ -25,13 +27,18 @@ const (
// Crawler implements the Tendermint p2p network crawler.
type Crawler struct {
logger zerolog.Logger
db *gorm.DB
pool *NodePool
ipClient *ipstack.Client
locCache *lru.ARCCache
seeds []string
doneCh chan struct{}

mtx sync.Mutex
db *gorm.DB
// tmpPeers is buffer that is used to hold peers that will later be added to
// the pool after a crawl is complete.
tmpPeers []Peer

crawlInterval time.Duration
recheckInterval time.Duration
}
Expand Down Expand Up @@ -87,18 +94,53 @@ func (c *Crawler) Start() {
case <-ticker.C:
c.logger.Info().Msg("starting to crawl nodes")

// reset the peer buffer
c.mtx.Lock()
c.tmpPeers = make([]Peer, 0)
c.mtx.Unlock()

var wg sync.WaitGroup
nc := 0
start := time.Now()

// Keep picking a pseudo-random node from the pool to crawl until the pool
// is exhausted.
peer, ok := c.pool.RandomNode()
for ok {
c.CrawlNode(peer)
wg.Add(1)
c.pool.DeleteNode(peer)

go func(p Peer) {
defer wg.Done()
c.CrawlNode(p)
}(peer)

// pick the next pseudo-random node
peer, ok = c.pool.RandomNode()

if nc%50 == 0 {
c.logger.Info().Int("size", c.pool.Size()).Msg("node pool size")
}

nc++
}

// wait for all crawlers to complete
wg.Wait()

// add all peers from the temp buffer to the node pool
c.mtx.Lock()
for _, p := range c.tmpPeers {
c.logger.Debug().Str("rpc_address", p.RPCAddr).Msg("adding peer to node pool")
c.pool.AddNode(p)
}
c.mtx.Unlock()

c.logger.Info().Msg("node crawl complete; reseeding node pool")
elapsed := time.Since(start).Seconds()

c.logger.Info().Int("num_crawled", nc).
Float64("elapsed", elapsed).
Msg("node crawl complete; reseeding node pool")
c.pool.Reseed()

case <-c.doneCh:
Expand Down Expand Up @@ -130,11 +172,15 @@ func (c *Crawler) RecheckNodes() {
nodeP2PAddr := fmt.Sprintf("%s:%s", node.Address, node.P2PPort)
nodeRPCAddr := fmt.Sprintf("http://%s:%s", node.Address, node.RPCPort)

c.logger.Debug().
Str("p2p_address", nodeP2PAddr).
Str("rpc_address", nodeRPCAddr).
Msg("adding node to node pool")
c.pool.AddNode(Peer{RPCAddr: nodeRPCAddr, Network: node.Network})
p := Peer{RPCAddr: nodeRPCAddr, Network: node.Network}
if !c.pool.HasNode(p) {
c.logger.Debug().
Str("p2p_address", nodeP2PAddr).
Str("rpc_address", nodeRPCAddr).
Time("last_sync", node.UpdatedAt).
Msg("adding stale node to node pool")
c.pool.AddNode(p)
}
}

case <-c.doneCh:
Expand All @@ -161,10 +207,14 @@ func (c *Crawler) CrawlNode(p Peer) {
Network: p.Network,
}

c.logger.Debug().
Str("p2p_address", nodeP2PAddr).
Str("rpc_address", p.RPCAddr).
Msg("pinging node...")
var deleteNode bool
defer func() {
if deleteNode {
c.deleteNode(node)
}
}()

c.logger.Debug().Str("p2p_address", nodeP2PAddr).Str("rpc_address", p.RPCAddr).Msg("pinging node...")

// Attempt to ping the node where upon failure, we remove the node from the
// database.
Expand All @@ -174,36 +224,41 @@ func (c *Crawler) CrawlNode(p Peer) {
Str("rpc_address", p.RPCAddr).
Msg("failed to ping node; deleting...")

if err := node.Delete(c.db); err != nil {
c.logger.Error().
Err(err).
Str("p2p_address", nodeP2PAddr).
Str("rpc_address", p.RPCAddr).
Msg("failed to delete node")
}

deleteNode = true
return
}

// Grab the node's geolocation information. Failure indicates we
// continue to crawl the node.
// Grab the node's geolocation information where upon failure, we remove the
// node from the database.
loc, err := c.GetGeolocation(node.Address)
if err != nil {
c.logger.Error().
Err(err).
Str("p2p_address", nodeP2PAddr).
Str("rpc_address", p.RPCAddr).
Msg("failed to get node geolocation")
Msg("failed to get node geolocation; deleting...")

deleteNode = true
return
}

node.Location = loc
client := newRPCClient(p.RPCAddr, clientTimeout)

// Attempt to get the node's status which provides us with rich information
// about the node. Upon failure, we return and prevent further crawling if the
// network is unknown due to the lack of any useful information about the node.
status, err := client.Status()
client, err := newRPCClient(p.RPCAddr, clientTimeout)
if err != nil {
c.logger.Error().
Err(err).
Str("p2p_address", nodeP2PAddr).
Str("rpc_address", p.RPCAddr).
Msg("failed to create RPC client")

return
}

// Attempt to get the node's status which provides us with node metadata.
// Upon failure, we return and prevent further crawling if the network is
// unknown due to the lack of any useful information about the node.
status, err := client.Status(context.Background())
if err != nil {
c.logger.Error().
Err(err).
Expand All @@ -212,56 +267,48 @@ func (c *Crawler) CrawlNode(p Peer) {
Msg("failed to get node status")

if node.Network == "" {
deleteNode = true
return
}
} else {
node.Moniker = status.NodeInfo.Moniker
node.NodeID = string(status.NodeInfo.ID())
node.Network = status.NodeInfo.Network
node.Version = status.NodeInfo.Version
node.TxIndex = status.NodeInfo.Other.TxIndex

netInfo, err := client.NetInfo()
if node.Network == "" {
node.Network = status.NodeInfo.Network
}

netInfo, err := client.NetInfo(context.Background())
if err != nil {
c.logger.Error().
Err(err).
Str("p2p_address", nodeP2PAddr).
Str("rpc_address", p.RPCAddr).
Msg("failed to get node net info")
return
}

// add the relevant peers to pool
for _, p := range netInfo.Peers {
peerRPCPort := parsePort(p.NodeInfo.Other.RPCAddress)
peerRPCAddress := fmt.Sprintf("http://%s:%s", p.RemoteIP, peerRPCPort)

// only add the peer to the pool if we haven't (re)discovered it
_, err := models.QueryNode(
c.db,
map[string]interface{}{"address": p.RemoteIP, "network": node.Network},
)
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
c.logger.Debug().
Str("peer_rpc_address", peerRPCAddress).
Msg("adding peer to node pool")
c.pool.AddNode(Peer{RPCAddr: peerRPCAddress, Network: node.Network})
} else {
// Add the relevant peers to the temp buffer which will later be added to
// the node pool.
for _, p := range netInfo.Peers {
peerRPCPort := parsePort(p.NodeInfo.Other.RPCAddress)
peerRPCAddress := fmt.Sprintf("http://%s:%s", p.RemoteIP, peerRPCPort)

// only add the peer to the pool if we haven't (re)discovered it
_, err := models.QueryNode(
c.db,
map[string]interface{}{"address": p.RemoteIP, "network": node.Network},
)
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
c.mtx.Lock()
c.tmpPeers = append(c.tmpPeers, Peer{RPCAddr: peerRPCAddress, Network: node.Network})
c.mtx.Unlock()
}
}
}
}

if _, err := node.Upsert(c.db); err != nil {
c.logger.Error().
Err(err).
Str("p2p_address", nodeP2PAddr).
Str("rpc_address", p.RPCAddr).
Msg("failed to save node")
} else {
c.logger.Info().
Str("p2p_address", nodeP2PAddr).
Str("rpc_address", p.RPCAddr).
Msg("successfully crawled and saved node")
}
c.upsertNode(node)
}

// GetGeolocation returns a Location record containing geolocation information
Expand Down Expand Up @@ -301,3 +348,31 @@ func (c *Crawler) GetGeolocation(addr string) (models.Location, error) {

return loc, nil
}

// deleteNode provides a thread-safe way of deleting the given node from the
// database. Concurrent goroutines are spawned for each node to crawl, so we
// use the crawler's mutex to prevent any issues with concurrent database
// operations.
func (c *Crawler) deleteNode(n models.Node) {
c.mtx.Lock()
defer c.mtx.Unlock()

if err := n.Delete(c.db); err != nil {
c.logger.Error().Err(err).Str("rpc_address", n.Address).Msg("failed to delete node")
}
}

// upsertNode provides a thread-safe way of updating the given node from the
// database. Concurrent goroutines are spawned for each node to crawl, so we
// use the crawler's mutex to prevent any issues with concurrent database
// operations.
func (c *Crawler) upsertNode(n models.Node) {
c.mtx.Lock()
defer c.mtx.Unlock()

if _, err := n.Upsert(c.db); err != nil {
c.logger.Error().Err(err).Str("rpc_address", n.Address).Msg("failed to save node")
} else {
c.logger.Info().Str("rpc_address", n.Address).Msg("successfully crawled and saved node")
}
}
16 changes: 10 additions & 6 deletions server/crawl/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@ import (
"time"

"github.com/harwoeck/ipstack"
rpcclient "github.com/tendermint/tendermint/rpc/client"
libclient "github.com/tendermint/tendermint/rpc/lib/client"
tmrpchttp "github.com/tendermint/tendermint/rpc/client/http"
jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client"

"github.com/cosmos/atlas/server/models"
)

var clientTimeout = 15 * time.Second
var clientTimeout = 5 * time.Second

func newRPCClient(remote string, timeout time.Duration) (*tmrpchttp.HTTP, error) {
httpClient, err := jsonrpcclient.DefaultHTTPClient(remote)
if err != nil {
return nil, err
}

func newRPCClient(remote string, timeout time.Duration) *rpcclient.HTTP {
httpClient := libclient.DefaultHTTPClient(remote)
httpClient.Timeout = timeout
return rpcclient.NewHTTPWithClient(remote, "/websocket", httpClient)
return tmrpchttp.NewWithClient(remote, "/websocket", httpClient)
}

func parsePort(nodeAddr string) string {
Expand Down
Loading