Skip to content

Commit

Permalink
Observer - P2P network crawler
Browse files Browse the repository at this point in the history
Observer crawls the Ethereum network and collects information about the nodes.
  • Loading branch information
battlmonstr committed Apr 21, 2022
1 parent cedb486 commit 737c00f
Show file tree
Hide file tree
Showing 31 changed files with 4,101 additions and 2 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ COMMANDS += downloader
COMMANDS += evm
COMMANDS += hack
COMMANDS += integration
COMMANDS += observer
COMMANDS += pics
COMMANDS += rpcdaemon
COMMANDS += rpctest
Expand Down
35 changes: 35 additions & 0 deletions cmd/observer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Observer - P2P network crawler

Observer crawls the Ethereum network and collects information about the nodes.

### Build

make observer

### Run

observer --datadir ... --nat extip:<IP> --port <PORT>

Where `IP` is your public IP, and `PORT` has to be open for incoming UDP traffic.

See `observer --help` for available options.

### Report

To get the report about the currently known network state run:

observer report --datadir ...

## Description

Observer uses [discv4](https://github.com/ethereum/devp2p/blob/master/discv4.md) protocol to discover new nodes.
Starting from a list of preconfigured "bootnodes" it uses FindNode
to obtain their "neighbor" nodes, and then recursively crawls neighbors of neighbors and so on.
Each found node is re-crawled again a few times.
If the node fails to be pinged after maximum attempts, it is considered "dead", but still re-crawled less often.

A separate "diplomacy" process is doing "handshakes" to obtain information about the discovered nodes.
It tries to get [RLPx Hello](https://github.com/ethereum/devp2p/blob/master/rlpx.md#hello-0x00)
and [Eth Status](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#status-0x00)
from each node.
The handshake repeats a few times according to the configured delays.
70 changes: 70 additions & 0 deletions cmd/observer/database/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package database

import (
"context"
"net"
"time"
)

type NodeID string

type NodeAddr1 struct {
IP net.IP
PortDisc uint16
PortRLPx uint16
}

type NodeAddr struct {
NodeAddr1
IPv6 NodeAddr1
}

type HandshakeError struct {
StringCode string
Time time.Time
}

type DB interface {
UpsertNodeAddr(ctx context.Context, id NodeID, addr NodeAddr) error
FindNodeAddr(ctx context.Context, id NodeID) (*NodeAddr, error)

ResetPingError(ctx context.Context, id NodeID) error
UpdatePingError(ctx context.Context, id NodeID) error
CountPingErrors(ctx context.Context, id NodeID) (*uint, error)

UpdateClientID(ctx context.Context, id NodeID, clientID string) error
UpdateNetworkID(ctx context.Context, id NodeID, networkID uint) error
UpdateEthVersion(ctx context.Context, id NodeID, ethVersion uint) error
UpdateHandshakeTransientError(ctx context.Context, id NodeID, hasTransientErr bool) error
InsertHandshakeError(ctx context.Context, id NodeID, handshakeErr string) error
DeleteHandshakeErrors(ctx context.Context, id NodeID) error
FindHandshakeLastErrors(ctx context.Context, id NodeID, limit uint) ([]HandshakeError, error)
UpdateHandshakeRetryTime(ctx context.Context, id NodeID, retryTime time.Time) error
FindHandshakeRetryTime(ctx context.Context, id NodeID) (*time.Time, error)
CountHandshakeCandidates(ctx context.Context) (uint, error)
FindHandshakeCandidates(ctx context.Context, limit uint) ([]NodeID, error)
MarkTakenHandshakeCandidates(ctx context.Context, nodes []NodeID) error
// TakeHandshakeCandidates runs FindHandshakeCandidates + MarkTakenHandshakeCandidates in a transaction.
TakeHandshakeCandidates(ctx context.Context, limit uint) ([]NodeID, error)

UpdateForkCompatibility(ctx context.Context, id NodeID, isCompatFork bool) error

UpdateNeighborBucketKeys(ctx context.Context, id NodeID, keys []string) error
FindNeighborBucketKeys(ctx context.Context, id NodeID) ([]string, error)

UpdateCrawlRetryTime(ctx context.Context, id NodeID, retryTime time.Time) error
CountCandidates(ctx context.Context) (uint, error)
FindCandidates(ctx context.Context, limit uint) ([]NodeID, error)
MarkTakenNodes(ctx context.Context, nodes []NodeID) error
// TakeCandidates runs FindCandidates + MarkTakenNodes in a transaction.
TakeCandidates(ctx context.Context, limit uint) ([]NodeID, error)

IsConflictError(err error) bool

CountNodes(ctx context.Context, maxPingTries uint, networkID uint) (uint, error)
CountIPs(ctx context.Context, maxPingTries uint, networkID uint) (uint, error)
CountClients(ctx context.Context, clientIDPrefix string, maxPingTries uint, networkID uint) (uint, error)
CountClientsWithNetworkID(ctx context.Context, clientIDPrefix string, maxPingTries uint) (uint, error)
CountClientsWithHandshakeTransientError(ctx context.Context, clientIDPrefix string, maxPingTries uint) (uint, error)
EnumerateClientIDs(ctx context.Context, maxPingTries uint, networkID uint, enumFunc func(clientID *string)) error
}
240 changes: 240 additions & 0 deletions cmd/observer/database/db_retrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
package database

import (
"context"
"github.com/ledgerwatch/erigon/cmd/observer/utils"
"github.com/ledgerwatch/log/v3"
"math/rand"
"time"
)

type DBRetrier struct {
db DB
log log.Logger
}

func NewDBRetrier(db DB, logger log.Logger) DBRetrier {
return DBRetrier{db, logger}
}

func retryBackoffTime(attempt int) time.Duration {
if attempt <= 0 {
return 0
}
jitter := rand.Int63n(30 * time.Millisecond.Nanoseconds() * int64(attempt))
var ns int64
if attempt <= 6 {
ns = ((50 * time.Millisecond.Nanoseconds()) << (attempt - 1)) + jitter
} else {
ns = 1600*time.Millisecond.Nanoseconds() + jitter
}
return time.Duration(ns)
}

func (db DBRetrier) retry(ctx context.Context, opName string, op func(context.Context) (interface{}, error)) (interface{}, error) {
const retryCount = 40
return utils.Retry(ctx, retryCount, retryBackoffTime, db.db.IsConflictError, db.log, opName, op)
}

func (db DBRetrier) UpsertNodeAddr(ctx context.Context, id NodeID, addr NodeAddr) error {
_, err := db.retry(ctx, "UpsertNodeAddr", func(ctx context.Context) (interface{}, error) {
return nil, db.db.UpsertNodeAddr(ctx, id, addr)
})
return err
}

func (db DBRetrier) FindNodeAddr(ctx context.Context, id NodeID) (*NodeAddr, error) {
resultAny, err := db.retry(ctx, "FindNodeAddr", func(ctx context.Context) (interface{}, error) {
return db.db.FindNodeAddr(ctx, id)
})

if resultAny == nil {
return nil, err
}
result := resultAny.(*NodeAddr)
return result, err
}

func (db DBRetrier) ResetPingError(ctx context.Context, id NodeID) error {
_, err := db.retry(ctx, "ResetPingError", func(ctx context.Context) (interface{}, error) {
return nil, db.db.ResetPingError(ctx, id)
})
return err
}

func (db DBRetrier) UpdatePingError(ctx context.Context, id NodeID) error {
_, err := db.retry(ctx, "UpdatePingError", func(ctx context.Context) (interface{}, error) {
return nil, db.db.UpdatePingError(ctx, id)
})
return err
}

func (db DBRetrier) CountPingErrors(ctx context.Context, id NodeID) (*uint, error) {
resultAny, err := db.retry(ctx, "CountPingErrors", func(ctx context.Context) (interface{}, error) {
return db.db.CountPingErrors(ctx, id)
})

if resultAny == nil {
return nil, err
}
result := resultAny.(*uint)
return result, err
}

func (db DBRetrier) UpdateClientID(ctx context.Context, id NodeID, clientID string) error {
_, err := db.retry(ctx, "UpdateClientID", func(ctx context.Context) (interface{}, error) {
return nil, db.db.UpdateClientID(ctx, id, clientID)
})
return err
}

func (db DBRetrier) UpdateNetworkID(ctx context.Context, id NodeID, networkID uint) error {
_, err := db.retry(ctx, "UpdateNetworkID", func(ctx context.Context) (interface{}, error) {
return nil, db.db.UpdateNetworkID(ctx, id, networkID)
})
return err
}

func (db DBRetrier) UpdateEthVersion(ctx context.Context, id NodeID, ethVersion uint) error {
_, err := db.retry(ctx, "UpdateEthVersion", func(ctx context.Context) (interface{}, error) {
return nil, db.db.UpdateEthVersion(ctx, id, ethVersion)
})
return err
}

func (db DBRetrier) UpdateHandshakeTransientError(ctx context.Context, id NodeID, hasTransientErr bool) error {
_, err := db.retry(ctx, "UpdateHandshakeTransientError", func(ctx context.Context) (interface{}, error) {
return nil, db.db.UpdateHandshakeTransientError(ctx, id, hasTransientErr)
})
return err
}

func (db DBRetrier) InsertHandshakeError(ctx context.Context, id NodeID, handshakeErr string) error {
_, err := db.retry(ctx, "InsertHandshakeError", func(ctx context.Context) (interface{}, error) {
return nil, db.db.InsertHandshakeError(ctx, id, handshakeErr)
})
return err
}

func (db DBRetrier) DeleteHandshakeErrors(ctx context.Context, id NodeID) error {
_, err := db.retry(ctx, "DeleteHandshakeErrors", func(ctx context.Context) (interface{}, error) {
return nil, db.db.DeleteHandshakeErrors(ctx, id)
})
return err
}

func (db DBRetrier) FindHandshakeLastErrors(ctx context.Context, id NodeID, limit uint) ([]HandshakeError, error) {
resultAny, err := db.retry(ctx, "FindHandshakeLastErrors", func(ctx context.Context) (interface{}, error) {
return db.db.FindHandshakeLastErrors(ctx, id, limit)
})

if resultAny == nil {
return nil, err
}
result := resultAny.([]HandshakeError)
return result, err
}

func (db DBRetrier) UpdateHandshakeRetryTime(ctx context.Context, id NodeID, retryTime time.Time) error {
_, err := db.retry(ctx, "UpdateHandshakeRetryTime", func(ctx context.Context) (interface{}, error) {
return nil, db.db.UpdateHandshakeRetryTime(ctx, id, retryTime)
})
return err
}

func (db DBRetrier) FindHandshakeRetryTime(ctx context.Context, id NodeID) (*time.Time, error) {
resultAny, err := db.retry(ctx, "FindHandshakeRetryTime", func(ctx context.Context) (interface{}, error) {
return db.db.FindHandshakeRetryTime(ctx, id)
})

if resultAny == nil {
return nil, err
}
result := resultAny.(*time.Time)
return result, err
}

func (db DBRetrier) CountHandshakeCandidates(ctx context.Context) (uint, error) {
resultAny, err := db.retry(ctx, "CountHandshakeCandidates", func(ctx context.Context) (interface{}, error) {
return db.db.CountHandshakeCandidates(ctx)
})

if resultAny == nil {
return 0, err
}
result := resultAny.(uint)
return result, err
}

func (db DBRetrier) TakeHandshakeCandidates(ctx context.Context, limit uint) ([]NodeID, error) {
resultAny, err := db.retry(ctx, "TakeHandshakeCandidates", func(ctx context.Context) (interface{}, error) {
return db.db.TakeHandshakeCandidates(ctx, limit)
})

if resultAny == nil {
return nil, err
}
result := resultAny.([]NodeID)
return result, err
}

func (db DBRetrier) UpdateForkCompatibility(ctx context.Context, id NodeID, isCompatFork bool) error {
_, err := db.retry(ctx, "UpdateForkCompatibility", func(ctx context.Context) (interface{}, error) {
return nil, db.db.UpdateForkCompatibility(ctx, id, isCompatFork)
})
return err
}

func (db DBRetrier) UpdateNeighborBucketKeys(ctx context.Context, id NodeID, keys []string) error {
_, err := db.retry(ctx, "UpdateNeighborBucketKeys", func(ctx context.Context) (interface{}, error) {
return nil, db.db.UpdateNeighborBucketKeys(ctx, id, keys)
})
return err
}

func (db DBRetrier) FindNeighborBucketKeys(ctx context.Context, id NodeID) ([]string, error) {
resultAny, err := db.retry(ctx, "FindNeighborBucketKeys", func(ctx context.Context) (interface{}, error) {
return db.db.FindNeighborBucketKeys(ctx, id)
})

if resultAny == nil {
return nil, err
}
result := resultAny.([]string)
return result, err
}

func (db DBRetrier) UpdateCrawlRetryTime(ctx context.Context, id NodeID, retryTime time.Time) error {
_, err := db.retry(ctx, "UpdateCrawlRetryTime", func(ctx context.Context) (interface{}, error) {
return nil, db.db.UpdateCrawlRetryTime(ctx, id, retryTime)
})
return err
}

func (db DBRetrier) CountCandidates(ctx context.Context) (uint, error) {
resultAny, err := db.retry(ctx, "CountCandidates", func(ctx context.Context) (interface{}, error) {
return db.db.CountCandidates(ctx)
})

if resultAny == nil {
return 0, err
}
result := resultAny.(uint)
return result, err
}

func (db DBRetrier) TakeCandidates(ctx context.Context, limit uint) ([]NodeID, error) {
resultAny, err := db.retry(ctx, "TakeCandidates", func(ctx context.Context) (interface{}, error) {
return db.db.TakeCandidates(ctx, limit)
})

if resultAny == nil {
return nil, err
}
result := resultAny.([]NodeID)
return result, err
}

func (db DBRetrier) IsConflictError(err error) bool {
return db.db.IsConflictError(err)
}
Loading

0 comments on commit 737c00f

Please sign in to comment.