diff --git a/cmd/tbcd/tbcd.go b/cmd/tbcd/tbcd.go index 5ef41d8f..0bd0662d 100644 --- a/cmd/tbcd/tbcd.go +++ b/cmd/tbcd/tbcd.go @@ -140,11 +140,6 @@ func HandleSignals(ctx context.Context, cancel context.CancelFunc, callback func os.Exit(2) } -func init() { - version.Component = "tbcd" - welcome = "Hemi Tiny Bitcoin Daemon " + version.BuildInfo() -} - func _main() error { // Parse configuration from environment if err := config.Parse(cm); err != nil { @@ -204,6 +199,11 @@ func _main() error { return nil } +func init() { + version.Component = "tbcd" + welcome = "Hemi Tiny Bitcoin Daemon " + version.BuildInfo() +} + func main() { if len(os.Args) != 1 { fmt.Fprintf(os.Stderr, "%v\n", welcome) @@ -215,7 +215,7 @@ func main() { } if err := _main(); err != nil { - log.Errorf("%v", err) + fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } } diff --git a/go.mod b/go.mod index f3f79938..71170ad3 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/hemilabs/heminetwork -go 1.22 +go 1.23 toolchain go1.23.0 diff --git a/service/tbc/crawler.go b/service/tbc/crawler.go index b2e2d8b4..ca0fea68 100644 --- a/service/tbc/crawler.go +++ b/service/tbc/crawler.go @@ -1309,7 +1309,7 @@ func (s *Server) SyncIndexersToHash(ctx context.Context, hash *chainhash.Hash) e return } // get a random peer - p, err := s.randomPeer(ctx) + p, err := s.pm.Random() if err != nil { s.mtx.Unlock() log.Errorf("sync indexers random peer: %v", err) diff --git a/service/tbc/peer.go b/service/tbc/peer.go index fa54d3aa..582258f0 100644 --- a/service/tbc/peer.go +++ b/service/tbc/peer.go @@ -39,6 +39,7 @@ type peer struct { connected time.Time address string + id int protocolVersion uint32 network wire.BitcoinNet @@ -47,12 +48,16 @@ type peer struct { addrV2 bool } -func NewPeer(network wire.BitcoinNet, address string) (*peer, error) { - // XXX parse address and return failure if it's wrong +func NewPeer(network wire.BitcoinNet, id int, address string) (*peer, error) { + _, _, err := net.SplitHostPort(address) + if err != nil { + return nil, fmt.Errorf("%v: %w", address, err) + } return &peer{ protocolVersion: wire.ProtocolVersion, network: network, address: address, + id: id, }, nil } @@ -60,6 +65,10 @@ func (p *peer) String() string { return p.address } +func (p *peer) Id() int { + return p.id +} + func (p *peer) write(timeout time.Duration, msg wire.Message) error { p.mtx.Lock() conn := p.conn @@ -182,8 +191,13 @@ func (p *peer) connect(ctx context.Context) error { p.mtx.Unlock() d := net.Dialer{ - Timeout: 5 * time.Second, - KeepAlive: 9 * time.Second, + Deadline: time.Now().Add(5 * time.Second), + KeepAliveConfig: net.KeepAliveConfig{ + Enable: true, + Idle: 7 * time.Second, + Interval: 7 * time.Second, + Count: 2, + }, } log.Debugf("dialing %s", p.address) @@ -191,7 +205,6 @@ func (p *peer) connect(ctx context.Context) error { if err != nil { return fmt.Errorf("dial %v: %w", p.address, err) } - log.Debugf("done dialing %s", p.address) err = p.handshake(ctx, conn) if err != nil { @@ -212,9 +225,12 @@ func (p *peer) close() error { defer log.Tracef("close exit") p.mtx.Lock() - defer p.mtx.Unlock() - if p.conn != nil { - return p.conn.Close() + conn := p.conn + p.conn = nil + p.isDialing = true // mark not connected + p.mtx.Unlock() + if conn != nil { + return conn.Close() } return net.ErrClosed } diff --git a/service/tbc/peer_manager.go b/service/tbc/peer_manager.go index b52f5f9c..bd5072ae 100644 --- a/service/tbc/peer_manager.go +++ b/service/tbc/peer_manager.go @@ -5,129 +5,414 @@ package tbc import ( + "context" + "errors" + "fmt" + "math/rand/v2" "net" "sync" + "time" + + "github.com/btcsuite/btcd/wire" ) const ( - maxPeersGood = 1 << 13 - maxPeersBad = 1 << 13 + maxPeersGood = 1024 + maxPeersBad = 1024 +) + +var ( + testnet3Seeds = []string{ + "testnet-seed.bitcoin.jonasschnelli.ch:18333", + "seed.tbtc.petertodd.org:18333", + "seed.testnet.bitcoin.sprovoost.nl:18333", + "testnet-seed.bluematt.me:18333", + } + mainnetSeeds = []string{ + "seed.bitcoin.sipa.be:8333", + "dnsseed.bluematt.me:8333", + "dnsseed.bitcoin.dashjr.org:8333", + "seed.bitcoinstats.com:8333", + "seed.bitnodes.io:8333", + "seed.bitcoin.jonasschnelli.ch:8333", + } + + ErrReset = errors.New("reset") + ErrNoAddresses = errors.New("no addresses") + ErrDNSSeed = errors.New("could not dns seed") + ErrNoConnectedPeers = errors.New("no connected peers") ) // PeerManager keeps track of the available peers and their quality. type PeerManager struct { - peersMtx sync.RWMutex - peersGood map[string]struct{} - peersBad map[string]struct{} - goodSeenMax int // keep track of max good peers seen to prevent early purge + mtx sync.RWMutex + + net wire.BitcoinNet // bitcoin network to connect to + + want int // number of peers we want to be connected to + + dnsSeeds []string // hard coded dns seeds + seeds []string // seeds obtained from DNS + + peers map[string]*peer // connected peers + good map[string]struct{} + bad map[string]struct{} + + peersC chan *peer // blocking channel for RandomConnect + slotsC chan int } -// newPeerManager returns a new peer manager. -func newPeerManager() *PeerManager { +// NewPeerManager returns a new peer manager. +func NewPeerManager(net wire.BitcoinNet, seeds []string, want int) (*PeerManager, error) { + if want < 1 { + return nil, errors.New("peers wanted must not be 0") + } + + var dnsSeeds []string + switch net { + case wire.MainNet: + dnsSeeds = mainnetSeeds + case wire.TestNet3: + dnsSeeds = testnet3Seeds + case wire.TestNet: + default: + return nil, fmt.Errorf("invalid network: %v", net) + } + return &PeerManager{ - peersGood: make(map[string]struct{}, maxPeersGood), - peersBad: make(map[string]struct{}, maxPeersBad), + net: net, + want: want, + dnsSeeds: dnsSeeds, + seeds: seeds, + good: make(map[string]struct{}, maxPeersGood), + bad: make(map[string]struct{}, maxPeersBad), + peers: make(map[string]*peer, want), + peersC: make(chan *peer, 0), + }, nil +} + +func (pm *PeerManager) String() string { + pm.mtx.RLock() + defer pm.mtx.RUnlock() + + return fmt.Sprintf("connected %v good %v bad %v", + len(pm.peers), len(pm.good), len(pm.bad)) +} + +func (pm *PeerManager) seed(pctx context.Context) error { + log.Tracef("seed") + defer log.Tracef("seed exit") + + // Seed + resolver := &net.Resolver{} + ctx, cancel := context.WithTimeout(pctx, 15*time.Second) + defer cancel() + + for _, v := range pm.dnsSeeds { + host, port, err := net.SplitHostPort(v) + if err != nil { + log.Errorf("Failed to parse host/port: %v", err) + continue + } + ips, err := resolver.LookupIP(ctx, "ip", host) + if err != nil { + log.Errorf("lookup: %v", err) + continue + } + + for _, ip := range ips { + address := net.JoinHostPort(ip.String(), port) + pm.seeds = append(pm.seeds, address) + } + } + + if len(pm.seeds) == 0 { + return ErrDNSSeed } + + return nil } // Stats returns peer statistics. -func (pm *PeerManager) Stats() (int, int) { - log.Tracef("PeersStats") - defer log.Tracef("PeersStats exit") +func (pm *PeerManager) Stats() (int, int, int) { + log.Tracef("Stats") + defer log.Tracef("Stats exit") - pm.peersMtx.RLock() - defer pm.peersMtx.RUnlock() - return len(pm.peersGood), len(pm.peersBad) + pm.mtx.RLock() + defer pm.mtx.RUnlock() + return len(pm.peers), len(pm.good), len(pm.bad) } -// PeersInsert adds known peers. -func (pm *PeerManager) PeersInsert(peers []string) error { - log.Tracef("PeersInsert") - defer log.Tracef("PeersInsert exit") - - pm.peersMtx.Lock() +// handleAddr adds peers to the good list if they do not exist in the connected +// and bad list. +// Note that this function requires the mutex to be held. +func (pm *PeerManager) handleAddr(peers []string) { for _, addr := range peers { - if _, ok := pm.peersBad[addr]; ok { + _, _, err := net.SplitHostPort(addr) + if err != nil { + continue + } + if _, ok := pm.peers[addr]; ok { + // Skip connected peers. + continue + } + if _, ok := pm.bad[addr]; ok { // Skip bad peers. continue } - pm.peersGood[addr] = struct{}{} + pm.good[addr] = struct{}{} + } + log.Debugf("HandleAddr exit %v good %v bad %v", + len(peers), len(pm.good), len(pm.bad)) +} + +// HandleAddr adds peers to good list. +func (pm *PeerManager) HandleAddr(peers []string) { + log.Tracef("HandleAddr %v", len(peers)) + + pm.mtx.Lock() + defer pm.mtx.Unlock() + pm.handleAddr(peers) +} + +// Good adds peer to good list if it does not exist in connected and good list already. +func (pm *PeerManager) Good(address string) error { + log.Tracef("Good") + defer log.Tracef("Good exit") + + _, _, err := net.SplitHostPort(address) + if err != nil { + return err } - allGoodPeers := len(pm.peersGood) - allBadPeers := len(pm.peersBad) - pm.peersMtx.Unlock() - log.Debugf("PeersInsert exit %v good %v bad %v", - len(peers), allGoodPeers, allBadPeers) + pm.mtx.Lock() + defer pm.mtx.Unlock() + + // If peer is connected don't add it to good list + if _, ok := pm.peers[address]; ok { + return fmt.Errorf("peer active: %v", address) + } + if _, ok := pm.good[address]; ok { + return fmt.Errorf("peer good: %v", address) + } + + // Remove peer from bad. + delete(pm.bad, address) + // Add peer to good. + pm.good[address] = struct{}{} + + log.Debugf("Good exit peers %v good %v bad %v", + len(pm.peers), len(pm.good), len(pm.bad)) return nil } -// PeerDelete marks the peer as bad. -// XXX this function only returns nil!? -func (pm *PeerManager) PeerDelete(address string) error { - log.Tracef("PeerDelete") - defer log.Tracef("PeerDelete exit") +// Bad marks the peer as bad. +func (pm *PeerManager) Bad(ctx context.Context, address string) error { + log.Tracef("Bad %v", address) + defer log.Tracef("Bad exit") _, _, err := net.SplitHostPort(address) if err != nil { return err } - pm.peersMtx.Lock() + pm.mtx.Lock() + + // If peer is connected, disconnect it and mark it bad + if p, ok := pm.peers[address]; ok { + if p != nil { + // if we don't have a peer we are going to starve the slots + log.Debugf("got address without peer: %v", address) + p.close() + go func() { + // Run outside of mutex + select { + case <-ctx.Done(): + case pm.slotsC <- p.Id(): + } + }() + } + delete(pm.peers, address) + } // Remove peer from good. - delete(pm.peersGood, address) + delete(pm.good, address) // Mark peer as bad. - pm.peersBad[address] = struct{}{} + pm.bad[address] = struct{}{} + + log.Debugf("Bad exit peers %v good %v bad %v", + len(pm.peers), len(pm.good), len(pm.bad)) + + pm.mtx.Unlock() + + return nil +} - // Crude hammer to reset good/bad state of peers +// Random returns a random connected peer. +func (pm *PeerManager) Random() (*peer, error) { + log.Tracef("Random") + defer log.Tracef("Random exit") - // XXX goodSeenMax should be a connection test; not a threshold. - // Another reason to move all peer stuff into the manager. - pm.goodSeenMax = max(pm.goodSeenMax, len(pm.peersGood)) - if pm.goodSeenMax > minPeersRequired && len(pm.peersGood) < minPeersRequired { - // Kill all peers to force caller to reseed. This happens when - // network is down for a while and all peers are moved into - // bad map. - clear(pm.peersGood) - clear(pm.peersBad) - pm.peersGood = make(map[string]struct{}, 8192) - pm.peersBad = make(map[string]struct{}, 8192) - pm.goodSeenMax = 0 - log.Debugf("peer cache purged") + pm.mtx.RLock() + defer pm.mtx.RUnlock() + + for _, p := range pm.peers { + if p.isConnected() { + return p, nil + } + } + + return nil, ErrNoConnectedPeers +} + +// All runs a call back on all connected peers. +func (pm *PeerManager) All(ctx context.Context, f func(ctx context.Context, p *peer)) { + log.Tracef("All") + defer log.Tracef("All") + + pm.mtx.RLock() + defer pm.mtx.RUnlock() + for _, p := range pm.peers { + if !p.isConnected() { + continue + } + go f(ctx, p) } +} - allGoodPeers := len(pm.peersGood) - allBadPeers := len(pm.peersBad) +// RandomConnect blocks until there is a peer ready to use. +func (pm *PeerManager) RandomConnect(ctx context.Context) (*peer, error) { + log.Tracef("RandomConnect") + defer log.Tracef("RandomConnect") - pm.peersMtx.Unlock() + // Block until a connect slot opens up + select { + case <-ctx.Done(): + return nil, ctx.Err() + case p := <-pm.peersC: + return p, nil + } +} - log.Debugf("PeerDelete exit good %v bad %v", allGoodPeers, allBadPeers) +func (pm *PeerManager) randomPeer(ctx context.Context, slot int) (*peer, error) { + pm.mtx.Lock() + defer pm.mtx.Unlock() + + // Reset caluse + // log.Infof("good %v bad %v seeds %v", len(pm.good), len(pm.bad), len(pm.seeds)) + if len(pm.good) < len(pm.seeds) && len(pm.bad) >= len(pm.seeds) { + // Return an error to make the caller aware that we have reset + // back to seeds. + clear(pm.bad) + pm.handleAddr(pm.seeds) + return nil, ErrReset + } + for k := range pm.good { + if _, ok := pm.peers[k]; ok { + // Address is in use + log.Errorf("address already on peers list: %v", k) + continue + } + if _, ok := pm.bad[k]; ok { + // Should not happen but let's make sure we aren't + // reusing an address. + log.Errorf("found addres on bad list: %v", k) + continue + } + + // Remove from good list and add to bad list. Thus active peers + // are len(bad)-len(peers) + delete(pm.good, k) + pm.bad[k] = struct{}{} + + return NewPeer(pm.net, slot, k) + } + return nil, ErrNoAddresses +} + +func (pm *PeerManager) connect(ctx context.Context, p *peer) error { + log.Tracef("connect: %v %v", p.Id(), p) + defer log.Tracef("connect exit: %v %v", p.Id(), p) + + if err := p.connect(ctx); err != nil { + return fmt.Errorf("new peer: %v", err) + } + + pm.mtx.Lock() + if _, ok := pm.peers[p.String()]; ok { + // This race does indeed happen because Good can add this. + p.close() // close new peer and don't add it + log.Errorf("peer already connected: %v", p) + pm.mtx.Unlock() + return fmt.Errorf("peer already connected: %v", p) + } + pm.peers[p.String()] = p + pm.mtx.Unlock() + + pm.peersC <- p // block return nil } -func (pm *PeerManager) PeersRandom(count int) ([]string, error) { - log.Tracef("PeersRandom") +func (pm *PeerManager) connectSlot(ctx context.Context, p *peer) { + if err := pm.connect(ctx, p); err != nil { + // log.Errorf("%v", err) + pm.slotsC <- p.Id() // give slot back + return + } +} - i := 0 - peers := make([]string, 0, count) +func (pm *PeerManager) Run(ctx context.Context) error { + log.Tracef("Run") + defer log.Tracef("Run") - pm.peersMtx.RLock() - allGoodPeers := len(pm.peersGood) - allBadPeers := len(pm.peersBad) - for k := range pm.peersGood { - peers = append(peers, k) - i++ - if i >= count { - break + if len(pm.seeds) == 0 { + log.Infof("Starting DNS seeder") + minW := 5 + maxW := 59 + for { + err := pm.seed(ctx) + if err != nil { + log.Debugf("seed: %v", err) + } else { + break + } + + holdOff := time.Duration(minW+rand.IntN(maxW-minW)) * time.Second + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(holdOff): + } } + log.Infof("DNS seeding complete") } - pm.peersMtx.RUnlock() + pm.HandleAddr(pm.seeds) // Add all seeds to good list + + log.Infof("Starting peer manager") + defer log.Infof("Peer manager stopped") - log.Debugf("PeersRandom exit %v (good %v bad %v)", len(peers), - allGoodPeers, allBadPeers) + // Start connecting "want" number of peers. + pm.slotsC = make(chan int, pm.want) + for i := 0; i < pm.want; i++ { + pm.slotsC <- i + } + for { + select { + case slot := <-pm.slotsC: + p, err := pm.randomPeer(ctx, slot) + if err != nil { + // basically no addresses, hold-off + <-time.After(7 * time.Second) + pm.slotsC <- slot // give the slot back + continue + } + go pm.connectSlot(ctx, p) - return peers, nil + case <-ctx.Done(): + log.Infof("exit") + return ctx.Err() + } + } } diff --git a/service/tbc/peermanager_test.go b/service/tbc/peermanager_test.go new file mode 100644 index 00000000..e750ee45 --- /dev/null +++ b/service/tbc/peermanager_test.go @@ -0,0 +1,87 @@ +// Copyright (c) 2024 Hemi Labs, Inc. +// Use of this source code is governed by the MIT License, +// which can be found in the LICENSE file. + +package tbc + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/btcsuite/btcd/wire" +) + +func ping(ctx context.Context, t *testing.T, p *peer) error { + err := p.write(time.Second, wire.NewMsgPing(uint64(time.Now().Unix()))) + if err != nil { + return err + } + + for { + msg, _, err := p.read(time.Second) + if errors.Is(err, wire.ErrUnknownMessage) { + continue + } else if err != nil { + return err + } + switch msg.(type) { + case *wire.MsgPong: + return nil + } + } +} + +func TestPeerManager(t *testing.T) { + t.Skip("this test connects to testnet3") + want := 2 + wantLoop := want * 2 + pm, err := NewPeerManager(wire.TestNet3, []string{}, want) + if err != nil { + t.Fatal(err) + } + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + go func() { + err = pm.Run(ctx) + if err != nil { + t.Logf("%v", err) + } + }() + + var wg sync.WaitGroup + for peers := 0; peers < wantLoop; { + p, err := pm.RandomConnect(ctx) + if err != nil { + // Should not be reached + t.Fatal(err) + } + wg.Add(1) + go func(pp *peer) { + defer wg.Done() + err := ping(ctx, t, pp) + if err != nil { + t.Logf("ping returned error but that's fine: %v", err) + } + // Always close + err = pm.Bad(ctx, pp.String()) + if err != nil { + panic(err) + } + }(p) + peers++ + t.Logf("%v", pm) + } + + wg.Wait() + + if len(pm.bad) < wantLoop { + t.Fatalf("not enough bad, got %v wanted %v", len(pm.bad), wantLoop) + } + if len(pm.peers) != 0 { + t.Fatalf("not enough peers, got %v wanted %v", len(pm.peers), 0) + } +} diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 2f174145..e4096cfc 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -9,7 +9,6 @@ import ( "errors" "fmt" "math/big" - "math/rand/v2" "net" "net/http" "os" @@ -29,7 +28,6 @@ import ( "github.com/dustin/go-humanize" "github.com/juju/loggo" "github.com/prometheus/client_golang/prometheus" - "github.com/syndtr/goleveldb/leveldb" "github.com/hemilabs/heminetwork/api" "github.com/hemilabs/heminetwork/api/tbcapi" @@ -65,20 +63,6 @@ var ( localnetSeeds = []string{ "127.0.0.1:18444", } - testnetSeeds = []string{ - "testnet-seed.bitcoin.jonasschnelli.ch:18333", - "seed.tbtc.petertodd.org:18333", - "seed.testnet.bitcoin.sprovoost.nl:18333", - "testnet-seed.bluematt.me:18333", - } - mainnetSeeds = []string{ - "seed.bitcoin.sipa.be:8333", - "dnsseed.bluematt.me:8333", - "dnsseed.bitcoin.dashjr.org:8333", - "seed.bitcoinstats.com:8333", - "seed.bitnodes.io:8333", - "seed.bitcoin.jonasschnelli.ch:8333", - } ) var log = loggo.GetLogger("tbc") @@ -120,10 +104,6 @@ type Server struct { mtx sync.RWMutex wg sync.WaitGroup - // Note that peers is protected by mtx NOT peersMtx - // TODO: move to PeerManager? - peers map[string]*peer // active but not necessarily connected - cfg *Config // stats @@ -135,13 +115,12 @@ type Server struct { mempool *mempool // bitcoin network + seeds []string // XXX remove wireNet wire.BitcoinNet chainParams *chaincfg.Params timeSource blockchain.MedianTimeSource - seeds []string checkpoints map[chainhash.Hash]uint64 - - pm *PeerManager + pm *PeerManager blocks *ttl.TTL // outstanding block downloads [hash]when/where pings *ttl.TTL // outstanding pings @@ -152,6 +131,10 @@ type Server struct { db tbcd.Database // Prometheus + prom struct { + syncInfo SyncInfo + connected, good, bad int + } // periodically updated by promPoll isRunning bool cmdsProcessed prometheus.Counter @@ -181,8 +164,6 @@ func NewServer(cfg *Config) (*Server, error) { cfg: cfg, printTime: time.Now().Add(10 * time.Second), blocks: blocks, - peers: make(map[string]*peer, cfg.PeersWanted), - pm: newPeerManager(), pings: pings, timeSource: blockchain.NewMedianTime(), cmdsProcessed: prometheus.NewCounter(prometheus.CounterOpts{ @@ -193,38 +174,45 @@ func NewServer(cfg *Config) (*Server, error) { sessions: make(map[string]*tbcWs), requestTimeout: defaultRequestTimeout, } - if s.cfg.MempoolEnabled { - s.mempool, err = mempoolNew() - if err != nil { - return nil, err + + log.Infof("MEMPOOL IS CURRENTLY BROKEN AND HAS BEEN DISABLED") + s.cfg.MempoolEnabled = false + if false { + if s.cfg.MempoolEnabled { + s.mempool, err = mempoolNew() + if err != nil { + return nil, err + } } } - // We could use a PGURI verification here. - + wanted := defaultPeersWanted switch cfg.Network { case "mainnet": s.wireNet = wire.MainNet s.chainParams = &chaincfg.MainNetParams - s.seeds = mainnetSeeds s.checkpoints = mainnetCheckpoints + case "testnet3": s.wireNet = wire.TestNet3 s.chainParams = &chaincfg.TestNet3Params - s.seeds = testnetSeeds s.checkpoints = testnet3Checkpoints + case networkLocalnet: s.wireNet = wire.TestNet s.chainParams = &chaincfg.RegressionNetParams - s.seeds = localnetSeeds s.checkpoints = make(map[chainhash.Hash]uint64) + wanted = 1 + default: return nil, fmt.Errorf("invalid network: %v", cfg.Network) } - if len(cfg.Seeds) > 0 { - s.seeds = cfg.Seeds + pm, err := NewPeerManager(s.wireNet, s.cfg.Seeds, wanted) + if err != nil { + return nil, err } + s.pm = pm return s, nil } @@ -242,263 +230,6 @@ func (s *Server) getHeaders(ctx context.Context, p *peer, hash *chainhash.Hash) return nil } -// TODO: move to PeerManager? -func (s *Server) seed(pctx context.Context, peersWanted int) ([]string, error) { - log.Tracef("seed") - defer log.Tracef("seed exit") - - peers, err := s.pm.PeersRandom(peersWanted) - if err != nil { - return nil, fmt.Errorf("peers random: %w", err) - } - // return peers from db first - if len(peers) >= peersWanted { - return peers, nil - } - - // Seed - resolver := &net.Resolver{} - ctx, cancel := context.WithTimeout(pctx, 15*time.Second) - defer cancel() - - errorsSeen := 0 - var moreSeeds []string - for _, v := range s.seeds { - host, port, err := net.SplitHostPort(v) - if err != nil { - log.Errorf("Failed to parse host/port: %v", err) - errorsSeen++ - continue - } - ips, err := resolver.LookupIP(ctx, "ip", host) - if err != nil { - log.Errorf("lookup: %v", err) - errorsSeen++ - continue - } - - for _, ip := range ips { - moreSeeds = append(moreSeeds, net.JoinHostPort(ip.String(), port)) - } - } - - if errorsSeen == len(s.seeds) { - return nil, errors.New("could not seed") - } - - // insert into peers table // TODO: ? - peers = append(peers, moreSeeds...) - - // return fake peers but don't save them to the database // TODO: ? - return peers, nil -} - -// TODO: move to PeerManager? -func (s *Server) seedForever(ctx context.Context, peersWanted int) ([]string, error) { - log.Tracef("seedForever") - defer log.Tracef("seedForever") - - minW := 5 - maxW := 59 - for { - holdOff := time.Duration(minW+rand.IntN(maxW-minW)) * time.Second - var em string - peers, err := s.seed(ctx, peersWanted) - if err != nil { - em = fmt.Sprintf("seed error: %v, retrying in %v", err, holdOff) - } else if peers != nil && len(peers) == 0 { - em = fmt.Sprintf("no peers found, retrying in %v", holdOff) - } else { - // great success! - return peers, nil - } - log.Errorf("%v", em) - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-time.After(holdOff): - } - } -} - -// TODO: move to PeerManager? -func (s *Server) peerAdd(p *peer) error { - log.Tracef("peerAdd: %v", p.address) - s.mtx.Lock() - defer s.mtx.Unlock() - if _, ok := s.peers[p.address]; ok { - return fmt.Errorf("peer exists: %v", p) - } - s.peers[p.address] = p - return nil -} - -// TODO: move to PeerManager? -func (s *Server) peerDelete(address string) { - log.Tracef("peerDelete: %v", address) - s.mtx.Lock() - delete(s.peers, address) - s.mtx.Unlock() -} - -// TODO: move to PeerManager? -func (s *Server) peersLen() int { - s.mtx.Lock() - defer s.mtx.Unlock() - return len(s.peers) -} - -// TODO: move to PeerManager? -func (s *Server) peerManager(ctx context.Context) error { - log.Tracef("PeerManager") - defer log.Tracef("PeerManager exit") - - // Channel for peering signals - peersWanted := s.cfg.PeersWanted - peerC := make(chan string, peersWanted) - - log.Infof("Peer manager connecting to %v peers", peersWanted) - seeds, err := s.seedForever(ctx, peersWanted) - if err != nil { - // context canceled - return fmt.Errorf("seed: %w", err) - } - if len(seeds) == 0 { - // should not happen - return errors.New("no seeds found") - } - - // Add a ticker that times out every 13 seconds regardless of what is - // going on. This will be nice and jittery and detect bad beers - // peridiocally. - loopTimeout := 13 * time.Second - loopTicker := time.NewTicker(loopTimeout) - - x := 0 - for { - peersActive := s.peersLen() - log.Debugf("peerManager active %v wanted %v", peersActive, peersWanted) - if peersActive < peersWanted { - // XXX we may want to make peers play along with waitgroup - - // Connect peer - for range peersWanted - peersActive { - peer, err := NewPeer(s.wireNet, seeds[x]) - if err != nil { - // This really should not happen - log.Errorf("new peer: %v", err) - } else { - if err := s.peerAdd(peer); err != nil { - log.Debugf("add peer: %v", err) - } else { - go s.peerConnect(ctx, peerC, peer) - } - } - - // Increment x before peer add since we want to - // move on to the next seed in case the peer is - // already in connected. - x++ - if x >= len(seeds) { - // XXX duplicate code from above - seeds, err = s.seedForever(ctx, peersWanted) - if err != nil { - // Context canceled - return fmt.Errorf("seed: %w", err) - } - if len(seeds) == 0 { - // should not happen - return errors.New("no seeds found") - } - x = 0 - } - - } - } - - // Unfortunately we need a timer here to restart the loop. The - // error is a laptop goes to sleep, all peers disconnect, RSTs - // are not seen by sleeping laptop, laptop wakes up. Now the - // expiration timers are all expired but not noticed by the - // laptop. - select { - case <-ctx.Done(): - return ctx.Err() - case address := <-peerC: - // peer exited, connect to new one - s.peerDelete(address) - - // Expire all blocks for peer - n := s.blocks.DeleteByValue(func(p any) bool { - return p.(*peer).address == address - }) - log.Debugf("peer exited: %v blocks canceled: %v", - address, n) - case <-loopTicker.C: - log.Tracef("pinging active peers: %v", s.peersLen()) - go s.pingAllPeers(ctx) - loopTicker.Reset(loopTimeout) - } - } -} - -// TODO: move to PeerManager? -func (s *Server) localPeerManager(ctx context.Context) error { - log.Tracef("localPeerManager") - defer log.Tracef("localPeerManager exit") - - if len(s.seeds) != 1 { - return fmt.Errorf("expecting 1 seed, received %d", len(s.seeds)) - } - - peersWanted := 1 - peerC := make(chan string, peersWanted) - - peer, err := NewPeer(s.wireNet, s.seeds[0]) - if err != nil { - return fmt.Errorf("new peer: %w", err) - } - - log.Infof("Local peer manager connecting to %v peers", peersWanted) - - for { - if err := s.peerAdd(peer); err != nil { - return err - } - go s.peerConnect(ctx, peerC, peer) - - select { - case <-ctx.Done(): - return ctx.Err() - case address := <-peerC: - s.peerDelete(address) - log.Infof("peer exited: %v", address) - } - - // hold off on reconnect - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(10 * time.Second): - log.Infof("peer exited: %v", "hold of timeout") - } - } -} - -// TODO: move to PeerManager? -func (s *Server) startPeerManager(ctx context.Context) error { - log.Tracef("startPeerManager") - defer log.Tracef("startPeerManager exit") - - switch s.cfg.Network { - case networkLocalnet: - return s.localPeerManager(ctx) - } - return s.peerManager(ctx) -} - -// TODO: move to PeerManager? func (s *Server) pingExpired(ctx context.Context, key any, value any) { log.Tracef("pingExpired") defer log.Tracef("pingExpired exit") @@ -514,42 +245,26 @@ func (s *Server) pingExpired(ctx context.Context, key any, value any) { } } -// TODO: move to PeerManager? -func (s *Server) pingAllPeers(ctx context.Context) { - log.Tracef("pingAllPeers") - defer log.Tracef("pingAllPeers exit") - - // XXX reason and explain why this cannot be reentrant - s.mtx.Lock() - defer s.mtx.Unlock() - - for _, p := range s.peers { - select { - case <-ctx.Done(): - return - default: - } - if !p.isConnected() { - continue - } +func (s *Server) pingPeer(ctx context.Context, p *peer) { + log.Tracef("pingPeer %v", p) + defer log.Tracef("pingPeer %v exit", p) - // Cancel outstanding ping, should not happen - peer := p.String() - s.pings.Cancel(peer) + // Cancel outstanding ping, should not happen + peer := p.String() + s.pings.Cancel(peer) - // We don't really care about the response. We just want to - // write to the connection to make it fail if the other side - // went away. - log.Debugf("Pinging: %v", p) - err := p.write(defaultCmdTimeout, wire.NewMsgPing(uint64(time.Now().Unix()))) - if err != nil { - log.Debugf("ping %v: %v", p, err) - return - } - - // Record outstanding ping - s.pings.Put(ctx, defaultPingTimeout, peer, p, s.pingExpired, nil) + // We don't really care about the response. We just want to + // write to the connection to make it fail if the other side + // went away. + log.Debugf("Pinging: %v", p) + err := p.write(defaultCmdTimeout, wire.NewMsgPing(uint64(time.Now().Unix()))) + if err != nil { + log.Debugf("ping %v: %v", p, err) + return } + + // Record outstanding ping + s.pings.Put(ctx, defaultPingTimeout, peer, p, s.pingExpired, nil) } func (s *Server) handleGeneric(ctx context.Context, p *peer, msg wire.Message, raw []byte) (bool, error) { @@ -888,76 +603,36 @@ func (s *Server) sod(ctx context.Context, p *peer) (*chainhash.Hash, error) { return hash, nil } -// TODO: move to PeerManager? -func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { - log.Tracef("peerConnect %v", p) - defer func() { - select { - case peerC <- p.String(): // remove from peer manager - default: - log.Tracef("could not signal peer channel: %v", p) - } - log.Tracef("peerConnect exit %v", p) - }() +func (s *Server) handlePeer(ctx context.Context, p *peer) error { + log.Tracef("handlePeer %v", p) - tctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - err := p.connect(tctx) defer func() { - // Remove from database; it's ok to be aggressive if it - // failed with no route to host or failed with i/o - // timeout or invalid network (ipv4/ipv6). - // - // This does have the side-effect of draining the peer - // table during network outages but that is ok. The - // peers table will be rebuild based on DNS seeds. - // - // XXX This really belongs in peer manager. - if err := s.pm.PeerDelete(p.String()); err != nil { - log.Errorf("peer manager delete (%v): %v", p, err) - } - if err := p.close(); err != nil && !errors.Is(err, net.ErrClosed) { - if errors.Is(err, net.ErrClosed) { - panic(err) - } - log.Errorf("peer disconnect: %v %v", p, err) - } + log.Tracef("handlePeer exit %v", p) + s.pm.Bad(ctx, p.String()) // always close peer }() - if err != nil { - log.Debugf("connect failed %v: %v", p, err) - return - } // See if our tip is indeed canonical. ch, err := s.sod(ctx, p) if err != nil { - if errors.Is(err, leveldb.ErrClosed) { - // Database is closed, This is terminal. - log.Criticalf("sod: %v database closed", p) - return - } + return err } else if ch != nil { err := s.getHeaders(ctx, p, ch) if err != nil { - // Database is closed, This is terminal. - log.Errorf("sod get headers: %v %v %v", p, ch, err) - return + return err } } // Get p2p information. err = p.write(defaultCmdTimeout, wire.NewMsgGetAddr()) if err != nil && !errors.Is(err, net.ErrClosed) { - log.Errorf("peer get addr: %v", err) - return + return err } if s.cfg.MempoolEnabled { // Start building the mempool. err = p.write(defaultCmdTimeout, wire.NewMsgMemPool()) if err != nil && !errors.Is(err, net.ErrClosed) { - log.Errorf("peer mempool: %v", err) - return + return err } } @@ -972,28 +647,27 @@ func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { // See if we were interrupted, for the love of pete add ctx to wire select { case <-ctx.Done(): - return + return ctx.Err() default: } msg, raw, err := p.read(defaultCmdTimeout) if errors.Is(err, wire.ErrUnknownMessage) { - // skip unknown + // skip unknown message continue } else if err != nil { // Check if context was canceled when read timeout occurs if errors.Is(err, os.ErrDeadlineExceeded) { select { case <-ctx.Done(): - return + return ctx.Err() default: } // Regular timeout, proceed with reading. continue } - log.Debugf("peer read %v: %v", p, err) - return + return err } if verbose { @@ -1002,8 +676,7 @@ func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { handled, err := s.handleGeneric(ctx, p, msg, raw) if err != nil { - log.Errorf("%T: %v", msg, err) - return + return err } if handled { continue @@ -1021,8 +694,7 @@ func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { switch m := msg.(type) { case *wire.MsgHeaders: if err := s.handleHeaders(ctx, p, m); err != nil { - log.Errorf("handle headers: %v", err) - return + return err } default: @@ -1053,6 +725,65 @@ func (s *Server) promRunning() float64 { return 0 } +func (s *Server) promSynced() float64 { + s.mtx.Lock() + defer s.mtx.Unlock() + if s.prom.syncInfo.Synced { + return 1 + } + return 0 +} + +func (s *Server) promBlockHeader() float64 { + s.mtx.Lock() + defer s.mtx.Unlock() + return float64(s.prom.syncInfo.BlockHeader.Height) +} + +func (s *Server) promUtxo() float64 { + s.mtx.Lock() + defer s.mtx.Unlock() + return float64(s.prom.syncInfo.Utxo.Height) +} + +func (s *Server) promTx() float64 { + s.mtx.Lock() + defer s.mtx.Unlock() + return float64(s.prom.syncInfo.Tx.Height) +} + +func (s *Server) promConnectedPeers() float64 { + s.mtx.Lock() + defer s.mtx.Unlock() + return float64(s.prom.connected) +} + +func (s *Server) promGoodPeers() float64 { + s.mtx.Lock() + defer s.mtx.Unlock() + return float64(s.prom.good) +} + +func (s *Server) promBadPeers() float64 { + s.mtx.Lock() + defer s.mtx.Unlock() + return float64(s.prom.bad) +} + +func (s *Server) promPoll(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + } + + s.prom.syncInfo = s.Synced(ctx) + s.prom.connected, s.prom.good, s.prom.bad = s.pm.Stats() + + } +} + // blksMissing checks the block cache and the database and returns true if all // blocks have not been downloaded. This function must be called with the lock // held. @@ -1081,9 +812,7 @@ func (s *Server) handleAddr(_ context.Context, p *peer, msg *wire.MsgAddr) error peers[i] = net.JoinHostPort(a.IP.String(), strconv.Itoa(int(a.Port))) } - if err := s.pm.PeersInsert(peers); err != nil { - return fmt.Errorf("insert peers: %w", err) - } + s.pm.HandleAddr(peers) return nil } @@ -1102,9 +831,7 @@ func (s *Server) handleAddrV2(_ context.Context, p *peer, msg *wire.MsgAddrV2) e peers = append(peers, addr) } - if err := s.pm.PeersInsert(peers); err != nil { - return fmt.Errorf("insert peers: %w", err) - } + s.pm.HandleAddr(peers) return nil } @@ -1245,24 +972,6 @@ func (s *Server) handleTx(ctx context.Context, p *peer, msg *wire.MsgTx, raw []b return s.mempool.txsInsert(ctx, msg, raw) } -// randomPeer returns a random peer from the map. Must be called with lock -// held. -// XXX move to PeerManager -func (s *Server) randomPeer(ctx context.Context) (*peer, error) { - log.Tracef("randomPeer") - defer log.Tracef("randomPeer exit") - - // unassigned slot, download block - for _, p := range s.peers { - if !p.isConnected() { - // Not connected yet - continue - } - return p, nil - } - return nil, errors.New("no peers") -} - func (s *Server) syncBlocks(ctx context.Context) { log.Tracef("syncBlocks") defer log.Tracef("syncBlocks exit") @@ -1303,7 +1012,7 @@ func (s *Server) syncBlocks(ctx context.Context) { } // XXX rethink closure, this is because of index flag mutex. go func() { - if err = s.SyncIndexersToBest(ctx); err != nil && err != ErrAlreadyIndexing { + if err = s.SyncIndexersToBest(ctx); err != nil && !errors.Is(err, ErrAlreadyIndexing) && errors.Is(err, context.Canceled) { // XXX this is probably not a panic. panic(fmt.Errorf("sync blocks: %w", err)) } @@ -1319,7 +1028,7 @@ func (s *Server) syncBlocks(ctx context.Context) { // Already being downloaded. continue } - rp, err := s.randomPeer(ctx) + rp, err := s.pm.Random() if err != nil { // This can happen during startup or when the network // is starved. @@ -1513,13 +1222,7 @@ func (s *Server) handleBlock(ctx context.Context, p *peer, msg *wire.MsgBlock, r } // Grab some peer stats as well - goodPeers, badPeers := s.pm.Stats() - // Gonna take it right into the Danger Zone! (double mutex) - for _, peer := range s.peers { - if peer.isConnected() { - connectedPeers++ - } - } + connectedPeers, goodPeers, badPeers := s.pm.Stats() // This is super awkward but prevents calculating N inserts * // time.Before(10*time.Second). @@ -1527,11 +1230,10 @@ func (s *Server) handleBlock(ctx context.Context, p *peer, msg *wire.MsgBlock, r log.Infof("Inserted %v blocks (%v) in the last %v", s.blocksInserted, humanize.Bytes(s.blocksSize), delta) - log.Infof("Pending blocks %v/%v active peers %v connected peers %v "+ - "good peers %v bad peers %v mempool %v %v", - s.blocks.Len(), defaultPendingBlocks, len(s.peers), - connectedPeers, goodPeers, badPeers, mempoolCount, - humanize.Bytes(uint64(mempoolSize))) + log.Infof("Pending blocks %v/%v connected peers %v good peers %v "+ + "bad peers %v mempool %v %v", + s.blocks.Len(), defaultPendingBlocks, connectedPeers, goodPeers, + badPeers, mempoolCount, humanize.Bytes(uint64(mempoolSize))) // Reset stats s.blocksSize = 0 @@ -2106,6 +1808,41 @@ func (s *Server) Run(pctx context.Context) error { Name: "running", Help: "Is tbc service running.", }, s.promRunning), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Subsystem: promSubsystem, + Name: "synced", + Help: "Is tbc synced.", + }, s.promSynced), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Subsystem: promSubsystem, + Name: "blockheader_height", + Help: "Blockheader height.", + }, s.promBlockHeader), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Subsystem: promSubsystem, + Name: "utxo_sync_height", + Help: "Height of utxo indexer.", + }, s.promUtxo), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Subsystem: promSubsystem, + Name: "tx_sync_height", + Help: "Height of tx indexer.", + }, s.promTx), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Subsystem: promSubsystem, + Name: "peers_connected", + Help: "Number of peers connected.", + }, s.promConnectedPeers), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Subsystem: promSubsystem, + Name: "peers_good", + Help: "Number of good peers.", + }, s.promGoodPeers), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Subsystem: promSubsystem, + Name: "peers_bad", + Help: "Number of bad peers.", + }, s.promBadPeers), } s.wg.Add(1) go func() { @@ -2116,13 +1853,22 @@ func (s *Server) Run(pctx context.Context) error { } log.Infof("prometheus clean shutdown") }() + s.wg.Add(1) + go func() { + defer s.wg.Done() + err := s.promPoll(ctx) + if err != nil { + log.Errorf("prometheus poll terminated with error: %v", err) + return + } + }() } errC := make(chan error) s.wg.Add(1) go func() { defer s.wg.Done() - if err := s.startPeerManager(ctx); err != nil { + if err := s.pm.Run(ctx); err != nil { select { case errC <- err: default: @@ -2130,6 +1876,43 @@ func (s *Server) Run(pctx context.Context) error { } }() + // connected peers + s.wg.Add(1) + go func() { + defer s.wg.Done() + for { + p, err := s.pm.RandomConnect(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + // Should not be reached + log.Errorf("random connect: %v", err) + return + } + go func(pp *peer) { + err := s.handlePeer(ctx, pp) + if err != nil { + log.Debugf("%v: %v", pp, err) + } + }(p) + } + }() + + // ping loop + s.wg.Add(1) + go func() { + defer s.wg.Done() + for { + select { + case <-ctx.Done(): + return + case <-time.After(13 * time.Second): + } + s.pm.All(ctx, s.pingPeer) + } + }() + select { case <-ctx.Done(): err = ctx.Err() diff --git a/service/tbc/tbc_test.go b/service/tbc/tbc_test.go index 6f5ea087..5ed46f07 100644 --- a/service/tbc/tbc_test.go +++ b/service/tbc/tbc_test.go @@ -828,6 +828,7 @@ func createTbcServer(ctx context.Context, t *testing.T, mappedPeerPort nat.Port) cfg.LevelDBHome = home cfg.Network = networkLocalnet cfg.ListenAddress = tcbListenAddress + cfg.Seeds = localnetSeeds tbcServer, err := NewServer(cfg) if err != nil { t.Fatal(err) diff --git a/service/tbc/tbcfork_test.go b/service/tbc/tbcfork_test.go index ad79babc..a17d3fdc 100644 --- a/service/tbc/tbcfork_test.go +++ b/service/tbc/tbcfork_test.go @@ -27,7 +27,6 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/juju/loggo" - "github.com/hemilabs/heminetwork/api/tbcapi" "github.com/hemilabs/heminetwork/database/tbcd" ) @@ -877,7 +876,7 @@ func TestFork(t *testing.T) { t.Fatal(err) } // t.Logf("%v", spew.Sdump(n.chain[n.Best()[0].String()])) - time.Sleep(500 * time.Millisecond) // XXX + time.Sleep(2 * time.Second) // XXX // Connect tbc service cfg := &Config{ @@ -886,12 +885,13 @@ func TestFork(t *testing.T) { BlockheaderCache: 1000, BlockSanity: false, LevelDBHome: t.TempDir(), - ListenAddress: tbcapi.DefaultListen, // TODO: should use random free port // LogLevel: "tbcd=TRACE:tbc=TRACE:level=DEBUG", MaxCachedTxs: 1000, // XXX Network: networkLocalnet, PeersWanted: 1, PrometheusListenAddress: "", + Seeds: []string{"127.0.0.1:18444"}, + ListenAddress: "localhost:8881", } _ = loggo.ConfigureLoggers(cfg.LogLevel) s, err := NewServer(cfg) @@ -1114,7 +1114,7 @@ func TestIndexNoFork(t *testing.T) { panic(err) } }() - time.Sleep(time.Second) + time.Sleep(time.Second * 2) // Connect tbc service cfg := &Config{ @@ -1123,12 +1123,13 @@ func TestIndexNoFork(t *testing.T) { BlockheaderCache: 1000, BlockSanity: false, LevelDBHome: t.TempDir(), - ListenAddress: tbcapi.DefaultListen, + ListenAddress: "localhost:8882", // LogLevel: "tbcd=TRACE:tbc=TRACE:level=DEBUG", MaxCachedTxs: 1000, // XXX Network: networkLocalnet, PeersWanted: 1, PrometheusListenAddress: "", + Seeds: []string{"127.0.0.1:18444"}, } _ = loggo.ConfigureLoggers(cfg.LogLevel) s, err := NewServer(cfg) @@ -1285,7 +1286,7 @@ func TestIndexFork(t *testing.T) { panic(err) } }() - time.Sleep(time.Second) + time.Sleep(time.Second * 2) // Connect tbc service cfg := &Config{ @@ -1294,13 +1295,14 @@ func TestIndexFork(t *testing.T) { BlockheaderCache: 1000, BlockSanity: false, LevelDBHome: t.TempDir(), - ListenAddress: tbcapi.DefaultListen, + ListenAddress: "localhost:8883", // LogLevel: "tbcd=TRACE:tbc=TRACE:level=DEBUG", MaxCachedTxs: 1000, // XXX Network: networkLocalnet, PeersWanted: 1, PrometheusListenAddress: "", MempoolEnabled: false, + Seeds: []string{"127.0.0.1:18444"}, } _ = loggo.ConfigureLoggers(cfg.LogLevel) s, err := NewServer(cfg)