From 4a72b7141e16ec149645028d6875baefadeb1e46 Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Thu, 3 Oct 2024 12:29:16 +0100 Subject: [PATCH 01/21] Start cleaning up peer and peermanager --- service/tbc/peer.go | 9 +++-- service/tbc/peer_manager.go | 73 +++++++++++++++---------------------- service/tbc/tbc.go | 8 ++-- 3 files changed, 40 insertions(+), 50 deletions(-) diff --git a/service/tbc/peer.go b/service/tbc/peer.go index fa54d3aa..6c831497 100644 --- a/service/tbc/peer.go +++ b/service/tbc/peer.go @@ -212,9 +212,12 @@ func (p *peer) close() error { defer log.Tracef("close exit") p.mtx.Lock() - defer p.mtx.Unlock() - if p.conn != nil { - return p.conn.Close() + conn := p.conn + p.conn = nil + p.isDialing = true // mark not connected + p.mtx.Unlock() + if conn != nil { + return conn.Close() } return net.ErrClosed } diff --git a/service/tbc/peer_manager.go b/service/tbc/peer_manager.go index b52f5f9c..93d600ce 100644 --- a/service/tbc/peer_manager.go +++ b/service/tbc/peer_manager.go @@ -17,16 +17,16 @@ const ( // PeerManager keeps track of the available peers and their quality. type PeerManager struct { peersMtx sync.RWMutex - peersGood map[string]struct{} - peersBad map[string]struct{} + good map[string]struct{} + bad map[string]struct{} goodSeenMax int // keep track of max good peers seen to prevent early purge } -// newPeerManager returns a new peer manager. -func newPeerManager() *PeerManager { +// NewPeerManager returns a new peer manager. +func NewPeerManager(seeds []string) *PeerManager { return &PeerManager{ - peersGood: make(map[string]struct{}, maxPeersGood), - peersBad: make(map[string]struct{}, maxPeersBad), + good: make(map[string]struct{}, maxPeersGood), + bad: make(map[string]struct{}, maxPeersBad), } } @@ -37,37 +37,32 @@ func (pm *PeerManager) Stats() (int, int) { pm.peersMtx.RLock() defer pm.peersMtx.RUnlock() - return len(pm.peersGood), len(pm.peersBad) + return len(pm.good), len(pm.bad) } // PeersInsert adds known peers. -func (pm *PeerManager) PeersInsert(peers []string) error { - log.Tracef("PeersInsert") - defer log.Tracef("PeersInsert exit") +func (pm *PeerManager) HandleAddr(peers []string) error { + log.Tracef("HandleAddr %v", len(peers)) pm.peersMtx.Lock() for _, addr := range peers { - if _, ok := pm.peersBad[addr]; ok { + if _, ok := pm.bad[addr]; ok { // Skip bad peers. continue } - pm.peersGood[addr] = struct{}{} + pm.good[addr] = struct{}{} } - allGoodPeers := len(pm.peersGood) - allBadPeers := len(pm.peersBad) - pm.peersMtx.Unlock() - log.Debugf("PeersInsert exit %v good %v bad %v", - len(peers), allGoodPeers, allBadPeers) + len(peers), len(pm.good), len(pm.bad)) + pm.peersMtx.Unlock() return nil } -// PeerDelete marks the peer as bad. -// XXX this function only returns nil!? -func (pm *PeerManager) PeerDelete(address string) error { - log.Tracef("PeerDelete") - defer log.Tracef("PeerDelete exit") +// Bad marks the peer as bad. +func (pm *PeerManager) Bad(address string) error { + log.Tracef("Bad") + defer log.Tracef("Bad exit") _, _, err := net.SplitHostPort(address) if err != nil { @@ -77,57 +72,49 @@ func (pm *PeerManager) PeerDelete(address string) error { pm.peersMtx.Lock() // Remove peer from good. - delete(pm.peersGood, address) + delete(pm.good, address) // Mark peer as bad. - pm.peersBad[address] = struct{}{} + pm.bad[address] = struct{}{} // Crude hammer to reset good/bad state of peers // XXX goodSeenMax should be a connection test; not a threshold. // Another reason to move all peer stuff into the manager. - pm.goodSeenMax = max(pm.goodSeenMax, len(pm.peersGood)) - if pm.goodSeenMax > minPeersRequired && len(pm.peersGood) < minPeersRequired { + pm.goodSeenMax = max(pm.goodSeenMax, len(pm.good)) + if pm.goodSeenMax > minPeersRequired && len(pm.good) < minPeersRequired { // Kill all peers to force caller to reseed. This happens when // network is down for a while and all peers are moved into // bad map. - clear(pm.peersGood) - clear(pm.peersBad) - pm.peersGood = make(map[string]struct{}, 8192) - pm.peersBad = make(map[string]struct{}, 8192) + clear(pm.good) + clear(pm.bad) + pm.good = make(map[string]struct{}, 8192) + pm.bad = make(map[string]struct{}, 8192) pm.goodSeenMax = 0 log.Debugf("peer cache purged") } - - allGoodPeers := len(pm.peersGood) - allBadPeers := len(pm.peersBad) - + log.Debugf("Bad exit good %v bad %v", len(pm.good), len(pm.bad)) pm.peersMtx.Unlock() - log.Debugf("PeerDelete exit good %v bad %v", allGoodPeers, allBadPeers) - return nil } func (pm *PeerManager) PeersRandom(count int) ([]string, error) { - log.Tracef("PeersRandom") + log.Tracef("PeersRandom %v", count) i := 0 peers := make([]string, 0, count) pm.peersMtx.RLock() - allGoodPeers := len(pm.peersGood) - allBadPeers := len(pm.peersBad) - for k := range pm.peersGood { + for k := range pm.good { peers = append(peers, k) i++ if i >= count { break } } + log.Debugf("PeersRandom exit %v (good %v bad %v)", + len(peers), len(pm.good), len(pm.bad)) pm.peersMtx.RUnlock() - log.Debugf("PeersRandom exit %v (good %v bad %v)", len(peers), - allGoodPeers, allBadPeers) - return peers, nil } diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 2f174145..13b251c4 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -182,7 +182,7 @@ func NewServer(cfg *Config) (*Server, error) { printTime: time.Now().Add(10 * time.Second), blocks: blocks, peers: make(map[string]*peer, cfg.PeersWanted), - pm: newPeerManager(), + pm: NewPeerManager(nil), pings: pings, timeSource: blockchain.NewMedianTime(), cmdsProcessed: prometheus.NewCounter(prometheus.CounterOpts{ @@ -913,7 +913,7 @@ func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { // peers table will be rebuild based on DNS seeds. // // XXX This really belongs in peer manager. - if err := s.pm.PeerDelete(p.String()); err != nil { + if err := s.pm.Bad(p.String()); err != nil { log.Errorf("peer manager delete (%v): %v", p, err) } if err := p.close(); err != nil && !errors.Is(err, net.ErrClosed) { @@ -1081,7 +1081,7 @@ func (s *Server) handleAddr(_ context.Context, p *peer, msg *wire.MsgAddr) error peers[i] = net.JoinHostPort(a.IP.String(), strconv.Itoa(int(a.Port))) } - if err := s.pm.PeersInsert(peers); err != nil { + if err := s.pm.HandleAddr(peers); err != nil { return fmt.Errorf("insert peers: %w", err) } @@ -1102,7 +1102,7 @@ func (s *Server) handleAddrV2(_ context.Context, p *peer, msg *wire.MsgAddrV2) e peers = append(peers, addr) } - if err := s.pm.PeersInsert(peers); err != nil { + if err := s.pm.HandleAddr(peers); err != nil { return fmt.Errorf("insert peers: %w", err) } From a77527d3bfe02e6faf5ad690b82bd77b143f109b Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Thu, 3 Oct 2024 16:39:43 +0100 Subject: [PATCH 02/21] Rewrite a new peer manager --- service/tbc/peer.go | 3 + service/tbc/peer_manager.go | 308 ++++++++++++++++++++++++++++---- service/tbc/peermanager_test.go | 82 +++++++++ service/tbc/tbc.go | 44 ++--- 4 files changed, 367 insertions(+), 70 deletions(-) create mode 100644 service/tbc/peermanager_test.go diff --git a/service/tbc/peer.go b/service/tbc/peer.go index 6c831497..56f06fe8 100644 --- a/service/tbc/peer.go +++ b/service/tbc/peer.go @@ -211,6 +211,9 @@ func (p *peer) close() error { log.Tracef("close") defer log.Tracef("close exit") + if p == nil { + panic("p") + } p.mtx.Lock() conn := p.conn p.conn = nil diff --git a/service/tbc/peer_manager.go b/service/tbc/peer_manager.go index 93d600ce..bacdfb81 100644 --- a/service/tbc/peer_manager.go +++ b/service/tbc/peer_manager.go @@ -5,29 +5,125 @@ package tbc import ( + "context" + "errors" + "fmt" + "math/rand/v2" "net" "sync" + "time" + + "github.com/btcsuite/btcd/wire" ) const ( - maxPeersGood = 1 << 13 - maxPeersBad = 1 << 13 + maxPeersGood = 1024 + maxPeersBad = 1024 +) + +var ( + testnet3Seeds = []string{ + "testnet-seed.bitcoin.jonasschnelli.ch:18333", + "seed.tbtc.petertodd.org:18333", + "seed.testnet.bitcoin.sprovoost.nl:18333", + "testnet-seed.bluematt.me:18333", + } + mainnetSeeds = []string{ + "seed.bitcoin.sipa.be:8333", + "dnsseed.bluematt.me:8333", + "dnsseed.bitcoin.dashjr.org:8333", + "seed.bitcoinstats.com:8333", + "seed.bitnodes.io:8333", + "seed.bitcoin.jonasschnelli.ch:8333", + } ) // PeerManager keeps track of the available peers and their quality. type PeerManager struct { - peersMtx sync.RWMutex - good map[string]struct{} - bad map[string]struct{} - goodSeenMax int // keep track of max good peers seen to prevent early purge + mtx sync.RWMutex + + net wire.BitcoinNet // bitcoin network to connect to + + want int // number of peers we want to be connected to + + dnsSeeds []string // hard coded dns seeds + seeds []string // seeds obtained from DNS + + peers map[string]*peer // connected peers + good map[string]struct{} + bad map[string]struct{} } // NewPeerManager returns a new peer manager. -func NewPeerManager(seeds []string) *PeerManager { +func NewPeerManager(net wire.BitcoinNet, want int) (*PeerManager, error) { + if want == 0 { + return nil, errors.New("peers wanted must not be 0") + } + + var dnsSeeds []string + switch net { + case wire.MainNet: + dnsSeeds = mainnetSeeds + case wire.TestNet3: + dnsSeeds = testnet3Seeds + case wire.TestNet: + default: + return nil, fmt.Errorf("invalid network: %v", net) + } + return &PeerManager{ - good: make(map[string]struct{}, maxPeersGood), - bad: make(map[string]struct{}, maxPeersBad), + net: net, + want: want, + dnsSeeds: dnsSeeds, + good: make(map[string]struct{}, maxPeersGood), + bad: make(map[string]struct{}, maxPeersBad), + peers: make(map[string]*peer, want), + }, nil +} + +func (pm *PeerManager) String() string { + pm.mtx.RLock() + defer pm.mtx.RUnlock() + + return fmt.Sprintf("Bad exit peers %v good %v bad %v", + len(pm.peers), len(pm.good), len(pm.bad)) +} + +func (pm *PeerManager) seed(pctx context.Context) error { + log.Tracef("seed") + defer log.Tracef("seed exit") + + // Seed + resolver := &net.Resolver{} + ctx, cancel := context.WithTimeout(pctx, 15*time.Second) + defer cancel() + + errorsSeen := 0 + for _, v := range pm.dnsSeeds { + host, port, err := net.SplitHostPort(v) + if err != nil { + log.Errorf("Failed to parse host/port: %v", err) + errorsSeen++ + continue + } + ips, err := resolver.LookupIP(ctx, "ip", host) + if err != nil { + log.Errorf("lookup: %v", err) + errorsSeen++ + continue + } + + for _, ip := range ips { + address := net.JoinHostPort(ip.String(), port) + pm.seeds = append(pm.seeds, address) + } } + + if len(pm.seeds) == 0 { + return errors.New("could not dns seed") + } + + return nil } // Stats returns peer statistics. @@ -35,30 +131,85 @@ func (pm *PeerManager) Stats() (int, int) { log.Tracef("PeersStats") defer log.Tracef("PeersStats exit") - pm.peersMtx.RLock() - defer pm.peersMtx.RUnlock() + pm.mtx.RLock() + defer pm.mtx.RUnlock() return len(pm.good), len(pm.bad) } -// PeersInsert adds known peers. -func (pm *PeerManager) HandleAddr(peers []string) error { - log.Tracef("HandleAddr %v", len(peers)) - - pm.peersMtx.Lock() +func (pm *PeerManager) handleAddr(peers []string) { for _, addr := range peers { + _, _, err := net.SplitHostPort(addr) + if err != nil { + continue + } if _, ok := pm.bad[addr]; ok { // Skip bad peers. continue } pm.good[addr] = struct{}{} } - log.Debugf("PeersInsert exit %v good %v bad %v", + log.Debugf("HandleAddr exit %v good %v bad %v", len(peers), len(pm.good), len(pm.bad)) - pm.peersMtx.Unlock() +} + +// HandleAddr adds peers to good list. +func (pm *PeerManager) HandleAddr(peers []string) { + log.Tracef("HandleAddr %v", len(peers)) + + pm.mtx.Lock() + defer pm.mtx.Unlock() + pm.handleAddr(peers) +} + +// Good adds peer good list. +func (pm *PeerManager) Good(address string) error { + log.Tracef("Good") + defer log.Tracef("Good exit") + + _, _, err := net.SplitHostPort(address) + if err != nil { + return err + } + + pm.mtx.Lock() + defer pm.mtx.Unlock() + + // If peer is connected don't add it to good list + if _, ok := pm.peers[address]; ok { + return fmt.Errorf("peer active: %v", address) + } + + // Remove peer from bad. + delete(pm.bad, address) + // Add peer to good. + pm.good[address] = struct{}{} + + log.Debugf("Good exit peers %v good %v bad %v", + len(pm.peers), len(pm.good), len(pm.bad)) return nil } +func (pm *PeerManager) Connected(p *peer) { + log.Tracef("Connected") + defer log.Tracef("Connected exit") + + address := p.String() + + pm.mtx.Lock() + defer pm.mtx.Unlock() + + // If peer is connected, ignore it + if _, ok := pm.peers[address]; !ok { + pm.peers[address] = p + } + delete(pm.bad, address) + delete(pm.good, address) + + log.Debugf("Connected exit peers %v good %v bad %v", + len(pm.peers), len(pm.good), len(pm.bad)) +} + // Bad marks the peer as bad. func (pm *PeerManager) Bad(address string) error { log.Tracef("Bad") @@ -69,42 +220,37 @@ func (pm *PeerManager) Bad(address string) error { return err } - pm.peersMtx.Lock() + pm.mtx.Lock() + + // If peer is connected, disconnect it and mark it bad + if p, ok := pm.peers[address]; ok { + if p != nil { + p.close() + } + delete(pm.peers, address) + } // Remove peer from good. delete(pm.good, address) // Mark peer as bad. pm.bad[address] = struct{}{} - // Crude hammer to reset good/bad state of peers - - // XXX goodSeenMax should be a connection test; not a threshold. - // Another reason to move all peer stuff into the manager. - pm.goodSeenMax = max(pm.goodSeenMax, len(pm.good)) - if pm.goodSeenMax > minPeersRequired && len(pm.good) < minPeersRequired { - // Kill all peers to force caller to reseed. This happens when - // network is down for a while and all peers are moved into - // bad map. - clear(pm.good) - clear(pm.bad) - pm.good = make(map[string]struct{}, 8192) - pm.bad = make(map[string]struct{}, 8192) - pm.goodSeenMax = 0 - log.Debugf("peer cache purged") - } - log.Debugf("Bad exit good %v bad %v", len(pm.good), len(pm.bad)) - pm.peersMtx.Unlock() + log.Debugf("Bad exit peers %v good %v bad %v", + len(pm.peers), len(pm.good), len(pm.bad)) + + pm.mtx.Unlock() return nil } +// XXX remove func (pm *PeerManager) PeersRandom(count int) ([]string, error) { log.Tracef("PeersRandom %v", count) i := 0 peers := make([]string, 0, count) - pm.peersMtx.RLock() + pm.mtx.RLock() for k := range pm.good { peers = append(peers, k) i++ @@ -114,7 +260,91 @@ func (pm *PeerManager) PeersRandom(count int) ([]string, error) { } log.Debugf("PeersRandom exit %v (good %v bad %v)", len(peers), len(pm.good), len(pm.bad)) - pm.peersMtx.RUnlock() + pm.mtx.RUnlock() return peers, nil } + +func (pm *PeerManager) RandomConnect(ctx context.Context) (*peer, error) { + log.Tracef("RandomConnect") + defer log.Tracef("RandomConnect") + + // Block until a connect slot opens up + for { + address := "" + pm.mtx.Lock() + if len(pm.peers) < pm.want { + // Check to see if we are out of good peers + if len(pm.good) == 0 { + pm.handleAddr(pm.seeds) + } + for k := range pm.good { + address = k + delete(pm.good, k) + } + } + pm.mtx.Unlock() + + if len(address) > 0 { + // connect peer + p, err := NewPeer(pm.net, address) + if err != nil { + // XXX can't happen, remove error case from NewPeer + log.Debugf("%v", err) + continue + } + err = p.connect(ctx) + if err != nil { + log.Debugf("%v: %v", p, err) + continue + } + pm.Connected(p) + return p, nil + } + + // Block but do timeout to see if something was reaped + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(3 * time.Second): + } + } + // NewPeer(pm.net, address) + return nil, errors.New("nope") +} + +func (pm *PeerManager) Run(ctx context.Context) error { + log.Tracef("Run") + defer log.Tracef("Run") + + log.Infof("Starting DNS seeder") + minW := 5 + maxW := 59 + for { + err := pm.seed(ctx) + if err != nil { + log.Debugf("seed: %v", err) + } else { + break + } + + holdOff := time.Duration(minW+rand.IntN(maxW-minW)) * time.Second + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(holdOff): + } + } + pm.HandleAddr(pm.seeds) // Add all seeds to good list + log.Infof("DNS seeding complete") + + log.Infof("Starting peer manager") + defer log.Infof("Peer manager stopped") + + select { + case <-ctx.Done(): + return ctx.Err() + } + + return nil +} diff --git a/service/tbc/peermanager_test.go b/service/tbc/peermanager_test.go new file mode 100644 index 00000000..460eccb1 --- /dev/null +++ b/service/tbc/peermanager_test.go @@ -0,0 +1,82 @@ +package tbc + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/btcsuite/btcd/wire" +) + +func ping(ctx context.Context, t *testing.T, p *peer) error { + err := p.write(time.Second, wire.NewMsgPing(uint64(time.Now().Unix()))) + if err != nil { + return err + } + + for { + msg, _, err := p.read(time.Second) + if errors.Is(err, wire.ErrUnknownMessage) { + continue + } else if err != nil { + return err + } + switch msg.(type) { + case *wire.MsgPong: + return nil + } + } +} + +func TestPeerManager(t *testing.T) { + want := 2 + wantLoop := want * 2 + pm, err := NewPeerManager(wire.TestNet3, want) + if err != nil { + t.Fatal(err) + } + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + go func() { + err = pm.Run(ctx) + if err != nil { + t.Logf("%v", err) + } + }() + + var wg sync.WaitGroup + for peers := 0; peers < wantLoop; { + p, err := pm.RandomConnect(ctx) + if err != nil { + // Should not be reached + t.Fatal(err) + } + wg.Add(1) + go func(pp *peer) { + defer wg.Done() + err := ping(ctx, t, pp) + if err != nil { + t.Logf("ping returned error but that's fine: %v", err) + } + // Always close + err = pm.Bad(pp.String()) + if err != nil { + panic(err) + } + }(p) + peers++ + t.Logf("%v", pm) + } + + wg.Wait() + + if len(pm.bad) != wantLoop { + t.Fatalf("not enough bad, got %v wanted %v", len(pm.bad), wantLoop) + } + if len(pm.peers) != 0 { + t.Fatalf("not enough peers, got %v wanted %v", len(pm.peers), 0) + } +} diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 13b251c4..e386f951 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -65,20 +65,6 @@ var ( localnetSeeds = []string{ "127.0.0.1:18444", } - testnetSeeds = []string{ - "testnet-seed.bitcoin.jonasschnelli.ch:18333", - "seed.tbtc.petertodd.org:18333", - "seed.testnet.bitcoin.sprovoost.nl:18333", - "testnet-seed.bluematt.me:18333", - } - mainnetSeeds = []string{ - "seed.bitcoin.sipa.be:8333", - "dnsseed.bluematt.me:8333", - "dnsseed.bitcoin.dashjr.org:8333", - "seed.bitcoinstats.com:8333", - "seed.bitnodes.io:8333", - "seed.bitcoin.jonasschnelli.ch:8333", - } ) var log = loggo.GetLogger("tbc") @@ -135,13 +121,12 @@ type Server struct { mempool *mempool // bitcoin network + seeds []string // XXX remove wireNet wire.BitcoinNet chainParams *chaincfg.Params timeSource blockchain.MedianTimeSource - seeds []string checkpoints map[chainhash.Hash]uint64 - - pm *PeerManager + pm *PeerManager blocks *ttl.TTL // outstanding block downloads [hash]when/where pings *ttl.TTL // outstanding pings @@ -182,7 +167,6 @@ func NewServer(cfg *Config) (*Server, error) { printTime: time.Now().Add(10 * time.Second), blocks: blocks, peers: make(map[string]*peer, cfg.PeersWanted), - pm: NewPeerManager(nil), pings: pings, timeSource: blockchain.NewMedianTime(), cmdsProcessed: prometheus.NewCounter(prometheus.CounterOpts{ @@ -200,31 +184,33 @@ func NewServer(cfg *Config) (*Server, error) { } } - // We could use a PGURI verification here. - switch cfg.Network { case "mainnet": s.wireNet = wire.MainNet s.chainParams = &chaincfg.MainNetParams - s.seeds = mainnetSeeds s.checkpoints = mainnetCheckpoints + case "testnet3": s.wireNet = wire.TestNet3 s.chainParams = &chaincfg.TestNet3Params - s.seeds = testnetSeeds s.checkpoints = testnet3Checkpoints + case networkLocalnet: s.wireNet = wire.TestNet s.chainParams = &chaincfg.RegressionNetParams - s.seeds = localnetSeeds s.checkpoints = make(map[chainhash.Hash]uint64) + + // XXX currently broken + default: return nil, fmt.Errorf("invalid network: %v", cfg.Network) } - if len(cfg.Seeds) > 0 { - s.seeds = cfg.Seeds + pm, err := NewPeerManager(s.wireNet, 64) // XXX 64 is a constant + if err != nil { + return nil, err } + s.pm = pm return s, nil } @@ -1081,9 +1067,7 @@ func (s *Server) handleAddr(_ context.Context, p *peer, msg *wire.MsgAddr) error peers[i] = net.JoinHostPort(a.IP.String(), strconv.Itoa(int(a.Port))) } - if err := s.pm.HandleAddr(peers); err != nil { - return fmt.Errorf("insert peers: %w", err) - } + s.pm.HandleAddr(peers) return nil } @@ -1102,9 +1086,7 @@ func (s *Server) handleAddrV2(_ context.Context, p *peer, msg *wire.MsgAddrV2) e peers = append(peers, addr) } - if err := s.pm.HandleAddr(peers); err != nil { - return fmt.Errorf("insert peers: %w", err) - } + s.pm.HandleAddr(peers) return nil } From c3fa593f4ce6142dbd1d17f31f5db893f8a87232 Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Thu, 3 Oct 2024 18:03:54 +0100 Subject: [PATCH 03/21] Make new peer manager mostly work --- service/tbc/crawler.go | 2 +- service/tbc/peer_manager.go | 65 ++--- service/tbc/tbc.go | 484 +++++++----------------------------- 3 files changed, 126 insertions(+), 425 deletions(-) diff --git a/service/tbc/crawler.go b/service/tbc/crawler.go index b2e2d8b4..ca0fea68 100644 --- a/service/tbc/crawler.go +++ b/service/tbc/crawler.go @@ -1309,7 +1309,7 @@ func (s *Server) SyncIndexersToHash(ctx context.Context, hash *chainhash.Hash) e return } // get a random peer - p, err := s.randomPeer(ctx) + p, err := s.pm.Random() if err != nil { s.mtx.Unlock() log.Errorf("sync indexers random peer: %v", err) diff --git a/service/tbc/peer_manager.go b/service/tbc/peer_manager.go index bacdfb81..049c6d87 100644 --- a/service/tbc/peer_manager.go +++ b/service/tbc/peer_manager.go @@ -85,7 +85,7 @@ func (pm *PeerManager) String() string { pm.mtx.RLock() defer pm.mtx.RUnlock() - return fmt.Sprintf("Bad exit peers %v good %v bad %v", + return fmt.Sprintf("connected %v good %v bad %v", len(pm.peers), len(pm.good), len(pm.bad)) } @@ -127,13 +127,13 @@ func (pm *PeerManager) seed(pctx context.Context) error { } // Stats returns peer statistics. -func (pm *PeerManager) Stats() (int, int) { +func (pm *PeerManager) Stats() (int, int, int) { log.Tracef("PeersStats") defer log.Tracef("PeersStats exit") pm.mtx.RLock() defer pm.mtx.RUnlock() - return len(pm.good), len(pm.bad) + return len(pm.peers), len(pm.good), len(pm.bad) } func (pm *PeerManager) handleAddr(peers []string) { @@ -142,6 +142,10 @@ func (pm *PeerManager) handleAddr(peers []string) { if err != nil { continue } + if _, ok := pm.peers[addr]; ok { + // Skip connected peers. + continue + } if _, ok := pm.bad[addr]; ok { // Skip bad peers. continue @@ -243,26 +247,19 @@ func (pm *PeerManager) Bad(address string) error { return nil } -// XXX remove -func (pm *PeerManager) PeersRandom(count int) ([]string, error) { - log.Tracef("PeersRandom %v", count) - - i := 0 - peers := make([]string, 0, count) +func (pm *PeerManager) Random() (*peer, error) { + log.Tracef("Random") + defer log.Tracef("Random exit") pm.mtx.RLock() - for k := range pm.good { - peers = append(peers, k) - i++ - if i >= count { - break + pm.mtx.RUnlock() + for _, p := range pm.peers { + if p.isConnected() { + return p, nil } } - log.Debugf("PeersRandom exit %v (good %v bad %v)", - len(peers), len(pm.good), len(pm.bad)) - pm.mtx.RUnlock() - return peers, nil + return nil, errors.New("no peers") } func (pm *PeerManager) RandomConnect(ctx context.Context) (*peer, error) { @@ -271,17 +268,27 @@ func (pm *PeerManager) RandomConnect(ctx context.Context) (*peer, error) { // Block until a connect slot opens up for { + log.Debugf("peer manager: %v", pm) address := "" pm.mtx.Lock() - if len(pm.peers) < pm.want { - // Check to see if we are out of good peers - if len(pm.good) == 0 { - pm.handleAddr(pm.seeds) - } - for k := range pm.good { - address = k - delete(pm.good, k) - } + + // XXX add reset caluse + //if len(pm.peers) < pm.want { + // // Check to see if we are out of good peers + // if len(pm.peers) == 0 && len(pm.good) == 0 && len(pm.bad) > 0 { + // log.Infof("RESET, needs flag") + // clear(pm.good) + // clear(pm.bad) + // pm.handleAddr(pm.seeds) + // } + // for k := range pm.good { + // address = k + // delete(pm.good, k) + // } + //} + for k := range pm.good { + address = k + continue } pm.mtx.Unlock() @@ -290,12 +297,12 @@ func (pm *PeerManager) RandomConnect(ctx context.Context) (*peer, error) { p, err := NewPeer(pm.net, address) if err != nil { // XXX can't happen, remove error case from NewPeer - log.Debugf("%v", err) + log.Errorf("%v", err) continue } err = p.connect(ctx) if err != nil { - log.Debugf("%v: %v", p, err) + pm.Bad(address) continue } pm.Connected(p) diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index e386f951..21528b55 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -9,7 +9,6 @@ import ( "errors" "fmt" "math/big" - "math/rand/v2" "net" "net/http" "os" @@ -29,7 +28,6 @@ import ( "github.com/dustin/go-humanize" "github.com/juju/loggo" "github.com/prometheus/client_golang/prometheus" - "github.com/syndtr/goleveldb/leveldb" "github.com/hemilabs/heminetwork/api" "github.com/hemilabs/heminetwork/api/tbcapi" @@ -106,10 +104,6 @@ type Server struct { mtx sync.RWMutex wg sync.WaitGroup - // Note that peers is protected by mtx NOT peersMtx - // TODO: move to PeerManager? - peers map[string]*peer // active but not necessarily connected - cfg *Config // stats @@ -166,7 +160,6 @@ func NewServer(cfg *Config) (*Server, error) { cfg: cfg, printTime: time.Now().Add(10 * time.Second), blocks: blocks, - peers: make(map[string]*peer, cfg.PeersWanted), pings: pings, timeSource: blockchain.NewMedianTime(), cmdsProcessed: prometheus.NewCounter(prometheus.CounterOpts{ @@ -177,13 +170,19 @@ func NewServer(cfg *Config) (*Server, error) { sessions: make(map[string]*tbcWs), requestTimeout: defaultRequestTimeout, } - if s.cfg.MempoolEnabled { - s.mempool, err = mempoolNew() - if err != nil { - return nil, err + + log.Infof("MEMPOOL IS CURRENTLY BROKEN AND HAS BEEN DISABLED") + s.cfg.MempoolEnabled = false + if false { + if s.cfg.MempoolEnabled { + s.mempool, err = mempoolNew() + if err != nil { + return nil, err + } } } + wanted := defaultPeersWanted switch cfg.Network { case "mainnet": s.wireNet = wire.MainNet @@ -199,6 +198,7 @@ func NewServer(cfg *Config) (*Server, error) { s.wireNet = wire.TestNet s.chainParams = &chaincfg.RegressionNetParams s.checkpoints = make(map[chainhash.Hash]uint64) + wanted = 1 // XXX currently broken @@ -206,7 +206,7 @@ func NewServer(cfg *Config) (*Server, error) { return nil, fmt.Errorf("invalid network: %v", cfg.Network) } - pm, err := NewPeerManager(s.wireNet, 64) // XXX 64 is a constant + pm, err := NewPeerManager(s.wireNet, wanted) if err != nil { return nil, err } @@ -228,263 +228,6 @@ func (s *Server) getHeaders(ctx context.Context, p *peer, hash *chainhash.Hash) return nil } -// TODO: move to PeerManager? -func (s *Server) seed(pctx context.Context, peersWanted int) ([]string, error) { - log.Tracef("seed") - defer log.Tracef("seed exit") - - peers, err := s.pm.PeersRandom(peersWanted) - if err != nil { - return nil, fmt.Errorf("peers random: %w", err) - } - // return peers from db first - if len(peers) >= peersWanted { - return peers, nil - } - - // Seed - resolver := &net.Resolver{} - ctx, cancel := context.WithTimeout(pctx, 15*time.Second) - defer cancel() - - errorsSeen := 0 - var moreSeeds []string - for _, v := range s.seeds { - host, port, err := net.SplitHostPort(v) - if err != nil { - log.Errorf("Failed to parse host/port: %v", err) - errorsSeen++ - continue - } - ips, err := resolver.LookupIP(ctx, "ip", host) - if err != nil { - log.Errorf("lookup: %v", err) - errorsSeen++ - continue - } - - for _, ip := range ips { - moreSeeds = append(moreSeeds, net.JoinHostPort(ip.String(), port)) - } - } - - if errorsSeen == len(s.seeds) { - return nil, errors.New("could not seed") - } - - // insert into peers table // TODO: ? - peers = append(peers, moreSeeds...) - - // return fake peers but don't save them to the database // TODO: ? - return peers, nil -} - -// TODO: move to PeerManager? -func (s *Server) seedForever(ctx context.Context, peersWanted int) ([]string, error) { - log.Tracef("seedForever") - defer log.Tracef("seedForever") - - minW := 5 - maxW := 59 - for { - holdOff := time.Duration(minW+rand.IntN(maxW-minW)) * time.Second - var em string - peers, err := s.seed(ctx, peersWanted) - if err != nil { - em = fmt.Sprintf("seed error: %v, retrying in %v", err, holdOff) - } else if peers != nil && len(peers) == 0 { - em = fmt.Sprintf("no peers found, retrying in %v", holdOff) - } else { - // great success! - return peers, nil - } - log.Errorf("%v", em) - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-time.After(holdOff): - } - } -} - -// TODO: move to PeerManager? -func (s *Server) peerAdd(p *peer) error { - log.Tracef("peerAdd: %v", p.address) - s.mtx.Lock() - defer s.mtx.Unlock() - if _, ok := s.peers[p.address]; ok { - return fmt.Errorf("peer exists: %v", p) - } - s.peers[p.address] = p - return nil -} - -// TODO: move to PeerManager? -func (s *Server) peerDelete(address string) { - log.Tracef("peerDelete: %v", address) - s.mtx.Lock() - delete(s.peers, address) - s.mtx.Unlock() -} - -// TODO: move to PeerManager? -func (s *Server) peersLen() int { - s.mtx.Lock() - defer s.mtx.Unlock() - return len(s.peers) -} - -// TODO: move to PeerManager? -func (s *Server) peerManager(ctx context.Context) error { - log.Tracef("PeerManager") - defer log.Tracef("PeerManager exit") - - // Channel for peering signals - peersWanted := s.cfg.PeersWanted - peerC := make(chan string, peersWanted) - - log.Infof("Peer manager connecting to %v peers", peersWanted) - seeds, err := s.seedForever(ctx, peersWanted) - if err != nil { - // context canceled - return fmt.Errorf("seed: %w", err) - } - if len(seeds) == 0 { - // should not happen - return errors.New("no seeds found") - } - - // Add a ticker that times out every 13 seconds regardless of what is - // going on. This will be nice and jittery and detect bad beers - // peridiocally. - loopTimeout := 13 * time.Second - loopTicker := time.NewTicker(loopTimeout) - - x := 0 - for { - peersActive := s.peersLen() - log.Debugf("peerManager active %v wanted %v", peersActive, peersWanted) - if peersActive < peersWanted { - // XXX we may want to make peers play along with waitgroup - - // Connect peer - for range peersWanted - peersActive { - peer, err := NewPeer(s.wireNet, seeds[x]) - if err != nil { - // This really should not happen - log.Errorf("new peer: %v", err) - } else { - if err := s.peerAdd(peer); err != nil { - log.Debugf("add peer: %v", err) - } else { - go s.peerConnect(ctx, peerC, peer) - } - } - - // Increment x before peer add since we want to - // move on to the next seed in case the peer is - // already in connected. - x++ - if x >= len(seeds) { - // XXX duplicate code from above - seeds, err = s.seedForever(ctx, peersWanted) - if err != nil { - // Context canceled - return fmt.Errorf("seed: %w", err) - } - if len(seeds) == 0 { - // should not happen - return errors.New("no seeds found") - } - x = 0 - } - - } - } - - // Unfortunately we need a timer here to restart the loop. The - // error is a laptop goes to sleep, all peers disconnect, RSTs - // are not seen by sleeping laptop, laptop wakes up. Now the - // expiration timers are all expired but not noticed by the - // laptop. - select { - case <-ctx.Done(): - return ctx.Err() - case address := <-peerC: - // peer exited, connect to new one - s.peerDelete(address) - - // Expire all blocks for peer - n := s.blocks.DeleteByValue(func(p any) bool { - return p.(*peer).address == address - }) - log.Debugf("peer exited: %v blocks canceled: %v", - address, n) - case <-loopTicker.C: - log.Tracef("pinging active peers: %v", s.peersLen()) - go s.pingAllPeers(ctx) - loopTicker.Reset(loopTimeout) - } - } -} - -// TODO: move to PeerManager? -func (s *Server) localPeerManager(ctx context.Context) error { - log.Tracef("localPeerManager") - defer log.Tracef("localPeerManager exit") - - if len(s.seeds) != 1 { - return fmt.Errorf("expecting 1 seed, received %d", len(s.seeds)) - } - - peersWanted := 1 - peerC := make(chan string, peersWanted) - - peer, err := NewPeer(s.wireNet, s.seeds[0]) - if err != nil { - return fmt.Errorf("new peer: %w", err) - } - - log.Infof("Local peer manager connecting to %v peers", peersWanted) - - for { - if err := s.peerAdd(peer); err != nil { - return err - } - go s.peerConnect(ctx, peerC, peer) - - select { - case <-ctx.Done(): - return ctx.Err() - case address := <-peerC: - s.peerDelete(address) - log.Infof("peer exited: %v", address) - } - - // hold off on reconnect - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(10 * time.Second): - log.Infof("peer exited: %v", "hold of timeout") - } - } -} - -// TODO: move to PeerManager? -func (s *Server) startPeerManager(ctx context.Context) error { - log.Tracef("startPeerManager") - defer log.Tracef("startPeerManager exit") - - switch s.cfg.Network { - case networkLocalnet: - return s.localPeerManager(ctx) - } - return s.peerManager(ctx) -} - -// TODO: move to PeerManager? func (s *Server) pingExpired(ctx context.Context, key any, value any) { log.Tracef("pingExpired") defer log.Tracef("pingExpired exit") @@ -500,43 +243,43 @@ func (s *Server) pingExpired(ctx context.Context, key any, value any) { } } -// TODO: move to PeerManager? -func (s *Server) pingAllPeers(ctx context.Context) { - log.Tracef("pingAllPeers") - defer log.Tracef("pingAllPeers exit") - - // XXX reason and explain why this cannot be reentrant - s.mtx.Lock() - defer s.mtx.Unlock() - - for _, p := range s.peers { - select { - case <-ctx.Done(): - return - default: - } - if !p.isConnected() { - continue - } - - // Cancel outstanding ping, should not happen - peer := p.String() - s.pings.Cancel(peer) - - // We don't really care about the response. We just want to - // write to the connection to make it fail if the other side - // went away. - log.Debugf("Pinging: %v", p) - err := p.write(defaultCmdTimeout, wire.NewMsgPing(uint64(time.Now().Unix()))) - if err != nil { - log.Debugf("ping %v: %v", p, err) - return - } - - // Record outstanding ping - s.pings.Put(ctx, defaultPingTimeout, peer, p, s.pingExpired, nil) - } -} +// XXX BRING THIS BACK +//func (s *Server) pingAllPeers(ctx context.Context) { +// log.Tracef("pingAllPeers") +// defer log.Tracef("pingAllPeers exit") +// +// // XXX reason and explain why this cannot be reentrant +// s.mtx.Lock() +// defer s.mtx.Unlock() +// +// for _, p := range s.peers { +// select { +// case <-ctx.Done(): +// return +// default: +// } +// if !p.isConnected() { +// continue +// } +// +// // Cancel outstanding ping, should not happen +// peer := p.String() +// s.pings.Cancel(peer) +// +// // We don't really care about the response. We just want to +// // write to the connection to make it fail if the other side +// // went away. +// log.Debugf("Pinging: %v", p) +// err := p.write(defaultCmdTimeout, wire.NewMsgPing(uint64(time.Now().Unix()))) +// if err != nil { +// log.Debugf("ping %v: %v", p, err) +// return +// } +// +// // Record outstanding ping +// s.pings.Put(ctx, defaultPingTimeout, peer, p, s.pingExpired, nil) +// } +//} func (s *Server) handleGeneric(ctx context.Context, p *peer, msg wire.Message, raw []byte) (bool, error) { // Do accept addr and ping commands before we consider the peer up. @@ -874,76 +617,36 @@ func (s *Server) sod(ctx context.Context, p *peer) (*chainhash.Hash, error) { return hash, nil } -// TODO: move to PeerManager? -func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { - log.Tracef("peerConnect %v", p) - defer func() { - select { - case peerC <- p.String(): // remove from peer manager - default: - log.Tracef("could not signal peer channel: %v", p) - } - log.Tracef("peerConnect exit %v", p) - }() +func (s *Server) handlePeer(ctx context.Context, p *peer) error { + log.Tracef("handlePeer %v", p) - tctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - err := p.connect(tctx) defer func() { - // Remove from database; it's ok to be aggressive if it - // failed with no route to host or failed with i/o - // timeout or invalid network (ipv4/ipv6). - // - // This does have the side-effect of draining the peer - // table during network outages but that is ok. The - // peers table will be rebuild based on DNS seeds. - // - // XXX This really belongs in peer manager. - if err := s.pm.Bad(p.String()); err != nil { - log.Errorf("peer manager delete (%v): %v", p, err) - } - if err := p.close(); err != nil && !errors.Is(err, net.ErrClosed) { - if errors.Is(err, net.ErrClosed) { - panic(err) - } - log.Errorf("peer disconnect: %v %v", p, err) - } + log.Tracef("handlePeer exit %v", p) + s.pm.Bad(p.String()) // always close peer }() - if err != nil { - log.Debugf("connect failed %v: %v", p, err) - return - } // See if our tip is indeed canonical. ch, err := s.sod(ctx, p) if err != nil { - if errors.Is(err, leveldb.ErrClosed) { - // Database is closed, This is terminal. - log.Criticalf("sod: %v database closed", p) - return - } + return err } else if ch != nil { err := s.getHeaders(ctx, p, ch) if err != nil { - // Database is closed, This is terminal. - log.Errorf("sod get headers: %v %v %v", p, ch, err) - return + return err } } // Get p2p information. err = p.write(defaultCmdTimeout, wire.NewMsgGetAddr()) if err != nil && !errors.Is(err, net.ErrClosed) { - log.Errorf("peer get addr: %v", err) - return + return err } if s.cfg.MempoolEnabled { // Start building the mempool. err = p.write(defaultCmdTimeout, wire.NewMsgMemPool()) if err != nil && !errors.Is(err, net.ErrClosed) { - log.Errorf("peer mempool: %v", err) - return + return err } } @@ -958,28 +661,27 @@ func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { // See if we were interrupted, for the love of pete add ctx to wire select { case <-ctx.Done(): - return + return ctx.Err() default: } msg, raw, err := p.read(defaultCmdTimeout) if errors.Is(err, wire.ErrUnknownMessage) { - // skip unknown + // skip unknown message continue } else if err != nil { // Check if context was canceled when read timeout occurs if errors.Is(err, os.ErrDeadlineExceeded) { select { case <-ctx.Done(): - return + return ctx.Err() default: } // Regular timeout, proceed with reading. continue } - log.Debugf("peer read %v: %v", p, err) - return + return err } if verbose { @@ -988,8 +690,7 @@ func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { handled, err := s.handleGeneric(ctx, p, msg, raw) if err != nil { - log.Errorf("%T: %v", msg, err) - return + return err } if handled { continue @@ -1007,8 +708,7 @@ func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { switch m := msg.(type) { case *wire.MsgHeaders: if err := s.handleHeaders(ctx, p, m); err != nil { - log.Errorf("handle headers: %v", err) - return + return err } default: @@ -1227,24 +927,6 @@ func (s *Server) handleTx(ctx context.Context, p *peer, msg *wire.MsgTx, raw []b return s.mempool.txsInsert(ctx, msg, raw) } -// randomPeer returns a random peer from the map. Must be called with lock -// held. -// XXX move to PeerManager -func (s *Server) randomPeer(ctx context.Context) (*peer, error) { - log.Tracef("randomPeer") - defer log.Tracef("randomPeer exit") - - // unassigned slot, download block - for _, p := range s.peers { - if !p.isConnected() { - // Not connected yet - continue - } - return p, nil - } - return nil, errors.New("no peers") -} - func (s *Server) syncBlocks(ctx context.Context) { log.Tracef("syncBlocks") defer log.Tracef("syncBlocks exit") @@ -1301,7 +983,7 @@ func (s *Server) syncBlocks(ctx context.Context) { // Already being downloaded. continue } - rp, err := s.randomPeer(ctx) + rp, err := s.pm.Random() if err != nil { // This can happen during startup or when the network // is starved. @@ -1495,13 +1177,7 @@ func (s *Server) handleBlock(ctx context.Context, p *peer, msg *wire.MsgBlock, r } // Grab some peer stats as well - goodPeers, badPeers := s.pm.Stats() - // Gonna take it right into the Danger Zone! (double mutex) - for _, peer := range s.peers { - if peer.isConnected() { - connectedPeers++ - } - } + connectedPeers, goodPeers, badPeers := s.pm.Stats() // This is super awkward but prevents calculating N inserts * // time.Before(10*time.Second). @@ -1509,11 +1185,10 @@ func (s *Server) handleBlock(ctx context.Context, p *peer, msg *wire.MsgBlock, r log.Infof("Inserted %v blocks (%v) in the last %v", s.blocksInserted, humanize.Bytes(s.blocksSize), delta) - log.Infof("Pending blocks %v/%v active peers %v connected peers %v "+ - "good peers %v bad peers %v mempool %v %v", - s.blocks.Len(), defaultPendingBlocks, len(s.peers), - connectedPeers, goodPeers, badPeers, mempoolCount, - humanize.Bytes(uint64(mempoolSize))) + log.Infof("Pending blocks %v/%v connected peers %v good peers %v "+ + "bad peers %v mempool %v %v", + s.blocks.Len(), defaultPendingBlocks, connectedPeers, goodPeers, + badPeers, mempoolCount, humanize.Bytes(uint64(mempoolSize))) // Reset stats s.blocksSize = 0 @@ -2104,7 +1779,7 @@ func (s *Server) Run(pctx context.Context) error { s.wg.Add(1) go func() { defer s.wg.Done() - if err := s.startPeerManager(ctx); err != nil { + if err := s.pm.Run(ctx); err != nil { select { case errC <- err: default: @@ -2112,6 +1787,25 @@ func (s *Server) Run(pctx context.Context) error { } }() + s.wg.Add(1) + go func() { + defer s.wg.Done() + for { + p, err := s.pm.RandomConnect(ctx) + if err != nil { + // Should not be reached + log.Errorf("random connect: %v", err) + continue + } + go func(pp *peer) { + err := s.handlePeer(ctx, pp) + if err != nil { + log.Errorf("%v: %v", pp, err) + } + }(p) + } + }() + select { case <-ctx.Done(): err = ctx.Err() From 9c3bf627e9761d5243773d1872a71892bc171aea Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Fri, 4 Oct 2024 03:32:21 -0400 Subject: [PATCH 04/21] Please linter --- service/tbc/peer_manager.go | 4 ---- service/tbc/peermanager_test.go | 4 ++++ 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/service/tbc/peer_manager.go b/service/tbc/peer_manager.go index 049c6d87..a0270afc 100644 --- a/service/tbc/peer_manager.go +++ b/service/tbc/peer_manager.go @@ -316,8 +316,6 @@ func (pm *PeerManager) RandomConnect(ctx context.Context) (*peer, error) { case <-time.After(3 * time.Second): } } - // NewPeer(pm.net, address) - return nil, errors.New("nope") } func (pm *PeerManager) Run(ctx context.Context) error { @@ -352,6 +350,4 @@ func (pm *PeerManager) Run(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() } - - return nil } diff --git a/service/tbc/peermanager_test.go b/service/tbc/peermanager_test.go index 460eccb1..e8c53186 100644 --- a/service/tbc/peermanager_test.go +++ b/service/tbc/peermanager_test.go @@ -1,3 +1,7 @@ +// Copyright (c) 2024 Hemi Labs, Inc. +// Use of this source code is governed by the MIT License, +// which can be found in the LICENSE file. + package tbc import ( From 217fc970b34af76b1fbcda3892fcccac9620f40f Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Fri, 4 Oct 2024 12:52:40 +0100 Subject: [PATCH 05/21] Add some prometheus crap --- service/tbc/tbc.go | 107 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 21528b55..525da39f 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -131,6 +131,10 @@ type Server struct { db tbcd.Database // Prometheus + prom struct { + syncInfo SyncInfo + connected, good, bad int + } // periodically updated by promPoll isRunning bool cmdsProcessed prometheus.Counter @@ -739,6 +743,65 @@ func (s *Server) promRunning() float64 { return 0 } +func (s *Server) promSynced() float64 { + s.mtx.Lock() + defer s.mtx.Unlock() + if s.prom.syncInfo.Synced { + return 1 + } + return 0 +} + +func (s *Server) promBlockHeader() float64 { + s.mtx.Lock() + defer s.mtx.Unlock() + return float64(s.prom.syncInfo.BlockHeader.Height) +} + +func (s *Server) promUtxo() float64 { + s.mtx.Lock() + defer s.mtx.Unlock() + return float64(s.prom.syncInfo.Utxo.Height) +} + +func (s *Server) promTx() float64 { + s.mtx.Lock() + defer s.mtx.Unlock() + return float64(s.prom.syncInfo.Tx.Height) +} + +func (s *Server) promConnectedPeers() float64 { + s.mtx.Lock() + defer s.mtx.Unlock() + return float64(s.prom.connected) +} + +func (s *Server) promGoodPeers() float64 { + s.mtx.Lock() + defer s.mtx.Unlock() + return float64(s.prom.good) +} + +func (s *Server) promBadPeers() float64 { + s.mtx.Lock() + defer s.mtx.Unlock() + return float64(s.prom.bad) +} + +func (s *Server) promPoll(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + } + + s.prom.syncInfo = s.Synced(ctx) + s.prom.connected, s.prom.good, s.prom.bad = s.pm.Stats() + + } +} + // blksMissing checks the block cache and the database and returns true if all // blocks have not been downloaded. This function must be called with the lock // held. @@ -1763,6 +1826,41 @@ func (s *Server) Run(pctx context.Context) error { Name: "running", Help: "Is tbc service running.", }, s.promRunning), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Subsystem: promSubsystem, + Name: "synced", + Help: "Is tbc synced.", + }, s.promSynced), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Subsystem: promSubsystem, + Name: "blockheader_height", + Help: "Blockheader height.", + }, s.promBlockHeader), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Subsystem: promSubsystem, + Name: "utxo_sync_height", + Help: "Height of utxo indexer.", + }, s.promUtxo), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Subsystem: promSubsystem, + Name: "tx_sync_height", + Help: "Height of tx indexer.", + }, s.promTx), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Subsystem: promSubsystem, + Name: "peers_connected", + Help: "Number of peers connected.", + }, s.promConnectedPeers), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Subsystem: promSubsystem, + Name: "peers_good", + Help: "Number of good peers.", + }, s.promGoodPeers), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Subsystem: promSubsystem, + Name: "peers_bad", + Help: "Number of bad peers.", + }, s.promBadPeers), } s.wg.Add(1) go func() { @@ -1773,6 +1871,15 @@ func (s *Server) Run(pctx context.Context) error { } log.Infof("prometheus clean shutdown") }() + s.wg.Add(1) + go func() { + defer s.wg.Done() + err := s.promPoll(ctx) + if err != nil { + log.Errorf("prometheus poll terminated with error: %v", err) + return + } + }() } errC := make(chan error) From 18febc8a45085e87e8b8d2267bf21b750ec83dfe Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Thu, 10 Oct 2024 11:35:54 +0100 Subject: [PATCH 06/21] bring back reset clause --- service/tbc/peer_manager.go | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/service/tbc/peer_manager.go b/service/tbc/peer_manager.go index a0270afc..d19625a0 100644 --- a/service/tbc/peer_manager.go +++ b/service/tbc/peer_manager.go @@ -272,20 +272,11 @@ func (pm *PeerManager) RandomConnect(ctx context.Context) (*peer, error) { address := "" pm.mtx.Lock() - // XXX add reset caluse - //if len(pm.peers) < pm.want { - // // Check to see if we are out of good peers - // if len(pm.peers) == 0 && len(pm.good) == 0 && len(pm.bad) > 0 { - // log.Infof("RESET, needs flag") - // clear(pm.good) - // clear(pm.bad) - // pm.handleAddr(pm.seeds) - // } - // for k := range pm.good { - // address = k - // delete(pm.good, k) - // } - //} + // Reset caluse + if len(pm.peers) < len(pm.seeds) { + clear(pm.bad) + pm.handleAddr(pm.seeds) + } for k := range pm.good { address = k continue From 3622cceecc4f15c40ab6cd3b4790a09421226142 Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Thu, 10 Oct 2024 16:37:53 +0100 Subject: [PATCH 07/21] Make peer manager parallel --- service/tbc/peer.go | 23 ++++-- service/tbc/peer_manager.go | 160 +++++++++++++++++++++++------------- service/tbc/tbc.go | 4 +- 3 files changed, 122 insertions(+), 65 deletions(-) diff --git a/service/tbc/peer.go b/service/tbc/peer.go index 56f06fe8..b624a1d1 100644 --- a/service/tbc/peer.go +++ b/service/tbc/peer.go @@ -39,6 +39,7 @@ type peer struct { connected time.Time address string + id int protocolVersion uint32 network wire.BitcoinNet @@ -47,12 +48,16 @@ type peer struct { addrV2 bool } -func NewPeer(network wire.BitcoinNet, address string) (*peer, error) { - // XXX parse address and return failure if it's wrong +func NewPeer(network wire.BitcoinNet, address string, id int) (*peer, error) { + _, _, err := net.SplitHostPort(address) + if err != nil { + return nil, err + } return &peer{ protocolVersion: wire.ProtocolVersion, network: network, address: address, + id: id, }, nil } @@ -60,6 +65,10 @@ func (p *peer) String() string { return p.address } +func (p *peer) Id() int { + return p.id +} + func (p *peer) write(timeout time.Duration, msg wire.Message) error { p.mtx.Lock() conn := p.conn @@ -182,8 +191,13 @@ func (p *peer) connect(ctx context.Context) error { p.mtx.Unlock() d := net.Dialer{ - Timeout: 5 * time.Second, - KeepAlive: 9 * time.Second, + Deadline: time.Now().Add(5 * time.Second), + KeepAliveConfig: net.KeepAliveConfig{ + Enable: true, + Idle: 7 * time.Second, + Interval: 7 * time.Second, + Count: 2, + }, } log.Debugf("dialing %s", p.address) @@ -191,7 +205,6 @@ func (p *peer) connect(ctx context.Context) error { if err != nil { return fmt.Errorf("dial %v: %w", p.address, err) } - log.Debugf("done dialing %s", p.address) err = p.handshake(ctx, conn) if err != nil { diff --git a/service/tbc/peer_manager.go b/service/tbc/peer_manager.go index d19625a0..1d78ee02 100644 --- a/service/tbc/peer_manager.go +++ b/service/tbc/peer_manager.go @@ -52,6 +52,9 @@ type PeerManager struct { peers map[string]*peer // connected peers good map[string]struct{} bad map[string]struct{} + + peersC chan *peer // blocking channel for RandomConnect + slotsC chan int } // NewPeerManager returns a new peer manager. @@ -78,6 +81,7 @@ func NewPeerManager(net wire.BitcoinNet, want int) (*PeerManager, error) { good: make(map[string]struct{}, maxPeersGood), bad: make(map[string]struct{}, maxPeersBad), peers: make(map[string]*peer, want), + peersC: make(chan *peer, 0), }, nil } @@ -182,6 +186,9 @@ func (pm *PeerManager) Good(address string) error { if _, ok := pm.peers[address]; ok { return fmt.Errorf("peer active: %v", address) } + if _, ok := pm.good[address]; ok { + return fmt.Errorf("peer good: %v", address) + } // Remove peer from bad. delete(pm.bad, address) @@ -194,29 +201,9 @@ func (pm *PeerManager) Good(address string) error { return nil } -func (pm *PeerManager) Connected(p *peer) { - log.Tracef("Connected") - defer log.Tracef("Connected exit") - - address := p.String() - - pm.mtx.Lock() - defer pm.mtx.Unlock() - - // If peer is connected, ignore it - if _, ok := pm.peers[address]; !ok { - pm.peers[address] = p - } - delete(pm.bad, address) - delete(pm.good, address) - - log.Debugf("Connected exit peers %v good %v bad %v", - len(pm.peers), len(pm.good), len(pm.bad)) -} - // Bad marks the peer as bad. -func (pm *PeerManager) Bad(address string) error { - log.Tracef("Bad") +func (pm *PeerManager) Bad(ctx context.Context, address string) error { + log.Tracef("Bad %v", address) defer log.Tracef("Bad exit") _, _, err := net.SplitHostPort(address) @@ -229,7 +216,16 @@ func (pm *PeerManager) Bad(address string) error { // If peer is connected, disconnect it and mark it bad if p, ok := pm.peers[address]; ok { if p != nil { + // if we don't have a peer we are going to starve the slots + log.Debugf("got address without peer: %v", address) p.close() + go func() { + // Run outside of mutex + select { + case <-ctx.Done(): + case pm.slotsC <- p.Id(): + } + }() } delete(pm.peers, address) } @@ -257,6 +253,7 @@ func (pm *PeerManager) Random() (*peer, error) { if p.isConnected() { return p, nil } + log.Errorf("not connected: %v", p) // XXX } return nil, errors.New("no peers") @@ -267,45 +264,82 @@ func (pm *PeerManager) RandomConnect(ctx context.Context) (*peer, error) { defer log.Tracef("RandomConnect") // Block until a connect slot opens up - for { - log.Debugf("peer manager: %v", pm) - address := "" - pm.mtx.Lock() - - // Reset caluse - if len(pm.peers) < len(pm.seeds) { - clear(pm.bad) - pm.handleAddr(pm.seeds) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case p := <-pm.peersC: + return p, nil + } +} + +func (pm *PeerManager) randomAddress(ctx context.Context) (string, error) { + pm.mtx.Lock() + defer pm.mtx.Unlock() + + // Reset caluse + if len(pm.good) < len(pm.seeds) && len(pm.bad) > len(pm.seeds) { + clear(pm.bad) + pm.handleAddr(pm.seeds) + } + for k := range pm.good { + if _, ok := pm.peers[k]; ok { + // Address is in use + continue } - for k := range pm.good { - address = k + if _, ok := pm.bad[k]; ok { + // Should not happen but let's make sure we aren't + // reusing an address. + log.Errorf("found addres on bad list: %v", k) continue } + + // Remove from good list and add to bad list. Thus active peers + // are len(bad)-len(peers) + delete(pm.good, k) + pm.bad[k] = struct{}{} + return k, nil + } + return "", errors.New("no addresses") +} + +func (pm *PeerManager) connect(ctx context.Context, slot int) error { + log.Tracef("connect: %v", slot) + defer log.Tracef("connect exit: %v", slot) + + address, err := pm.randomAddress(ctx) + if err != nil { + return fmt.Errorf("random address: %v", err) + } + p, err := NewPeer(pm.net, address, slot) + if err != nil { + return fmt.Errorf("new peer: %v", err) + } + err = p.connect(ctx) + if err != nil { + return fmt.Errorf("new peer: %v", err) + } + + pm.mtx.Lock() + if _, ok := pm.peers[address]; ok { + // This race does indeed happen because Good can add this. + p.close() // close new peer and don't add it + log.Errorf("peer already connected: %v", address) pm.mtx.Unlock() + return nil // XXX or else we free the slot UGH + } + pm.peers[address] = p + pm.mtx.Unlock() - if len(address) > 0 { - // connect peer - p, err := NewPeer(pm.net, address) - if err != nil { - // XXX can't happen, remove error case from NewPeer - log.Errorf("%v", err) - continue - } - err = p.connect(ctx) - if err != nil { - pm.Bad(address) - continue - } - pm.Connected(p) - return p, nil - } + pm.peersC <- p // block - // Block but do timeout to see if something was reaped - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-time.After(3 * time.Second): - } + return nil +} + +func (pm *PeerManager) connectSlot(ctx context.Context, slot int) { + err := pm.connect(ctx, slot) + if err != nil { + pm.slotsC <- slot // give slot back + return } } @@ -337,8 +371,18 @@ func (pm *PeerManager) Run(ctx context.Context) error { log.Infof("Starting peer manager") defer log.Infof("Peer manager stopped") - select { - case <-ctx.Done(): - return ctx.Err() + // Start connecting "want" number of peers. + pm.slotsC = make(chan int, pm.want) + for i := 0; i < pm.want; i++ { + pm.slotsC <- i + } + for { + select { + case slot := <-pm.slotsC: + go pm.connectSlot(ctx, slot) + + case <-ctx.Done(): + return ctx.Err() + } } } diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 525da39f..ccfcc496 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -626,7 +626,7 @@ func (s *Server) handlePeer(ctx context.Context, p *peer) error { defer func() { log.Tracef("handlePeer exit %v", p) - s.pm.Bad(p.String()) // always close peer + s.pm.Bad(ctx, p.String()) // always close peer }() // See if our tip is indeed canonical. @@ -1907,7 +1907,7 @@ func (s *Server) Run(pctx context.Context) error { go func(pp *peer) { err := s.handlePeer(ctx, pp) if err != nil { - log.Errorf("%v: %v", pp, err) + log.Debugf("%v: %v", pp, err) } }(p) } From c33d72ca8b75bb8d26fbe8db7f7d1fdea88848cf Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Thu, 10 Oct 2024 18:32:33 +0100 Subject: [PATCH 08/21] This seems to make providing seeds work correctly --- service/tbc/peer.go | 4 +- service/tbc/peer_manager.go | 94 ++++++++++++++++++++----------------- service/tbc/tbc.go | 4 +- 3 files changed, 55 insertions(+), 47 deletions(-) diff --git a/service/tbc/peer.go b/service/tbc/peer.go index b624a1d1..88f77076 100644 --- a/service/tbc/peer.go +++ b/service/tbc/peer.go @@ -48,10 +48,10 @@ type peer struct { addrV2 bool } -func NewPeer(network wire.BitcoinNet, address string, id int) (*peer, error) { +func NewPeer(network wire.BitcoinNet, id int, address string) (*peer, error) { _, _, err := net.SplitHostPort(address) if err != nil { - return nil, err + return nil, fmt.Errorf("%v: %w", address, err) } return &peer{ protocolVersion: wire.ProtocolVersion, diff --git a/service/tbc/peer_manager.go b/service/tbc/peer_manager.go index 1d78ee02..082bc2d7 100644 --- a/service/tbc/peer_manager.go +++ b/service/tbc/peer_manager.go @@ -58,7 +58,7 @@ type PeerManager struct { } // NewPeerManager returns a new peer manager. -func NewPeerManager(net wire.BitcoinNet, want int) (*PeerManager, error) { +func NewPeerManager(net wire.BitcoinNet, seeds []string, want int) (*PeerManager, error) { if want == 0 { return nil, errors.New("peers wanted must not be 0") } @@ -78,6 +78,7 @@ func NewPeerManager(net wire.BitcoinNet, want int) (*PeerManager, error) { net: net, want: want, dnsSeeds: dnsSeeds, + seeds: seeds, good: make(map[string]struct{}, maxPeersGood), bad: make(map[string]struct{}, maxPeersBad), peers: make(map[string]*peer, want), @@ -272,18 +273,23 @@ func (pm *PeerManager) RandomConnect(ctx context.Context) (*peer, error) { } } -func (pm *PeerManager) randomAddress(ctx context.Context) (string, error) { +func (pm *PeerManager) randomPeer(ctx context.Context, slot int) (*peer, error) { pm.mtx.Lock() defer pm.mtx.Unlock() // Reset caluse - if len(pm.good) < len(pm.seeds) && len(pm.bad) > len(pm.seeds) { + // log.Infof("good %v bad %v seeds %v", len(pm.good), len(pm.bad), len(pm.seeds)) + if len(pm.good) < len(pm.seeds) && len(pm.bad) >= len(pm.seeds) { + // Return an error to make the caller aware that we have reset + // back to seeds. clear(pm.bad) pm.handleAddr(pm.seeds) + return nil, errors.New("reset") } for k := range pm.good { if _, ok := pm.peers[k]; ok { // Address is in use + log.Errorf("address already on peers list: %v", k) continue } if _, ok := pm.bad[k]; ok { @@ -297,37 +303,30 @@ func (pm *PeerManager) randomAddress(ctx context.Context) (string, error) { // are len(bad)-len(peers) delete(pm.good, k) pm.bad[k] = struct{}{} - return k, nil + + return NewPeer(pm.net, slot, k) } - return "", errors.New("no addresses") + return nil, errors.New("no addresses") } -func (pm *PeerManager) connect(ctx context.Context, slot int) error { - log.Tracef("connect: %v", slot) - defer log.Tracef("connect exit: %v", slot) +func (pm *PeerManager) connect(ctx context.Context, p *peer) error { + log.Tracef("connect: %v %v", p.Id(), p) + defer log.Tracef("connect exit: %v %v", p.Id(), p) - address, err := pm.randomAddress(ctx) - if err != nil { - return fmt.Errorf("random address: %v", err) - } - p, err := NewPeer(pm.net, address, slot) - if err != nil { - return fmt.Errorf("new peer: %v", err) - } - err = p.connect(ctx) + err := p.connect(ctx) if err != nil { return fmt.Errorf("new peer: %v", err) } pm.mtx.Lock() - if _, ok := pm.peers[address]; ok { + if _, ok := pm.peers[p.String()]; ok { // This race does indeed happen because Good can add this. p.close() // close new peer and don't add it - log.Errorf("peer already connected: %v", address) + log.Errorf("peer already connected: %v", p) pm.mtx.Unlock() - return nil // XXX or else we free the slot UGH + return fmt.Errorf("connect %w", err) } - pm.peers[address] = p + pm.peers[p.String()] = p pm.mtx.Unlock() pm.peersC <- p // block @@ -335,10 +334,11 @@ func (pm *PeerManager) connect(ctx context.Context, slot int) error { return nil } -func (pm *PeerManager) connectSlot(ctx context.Context, slot int) { - err := pm.connect(ctx, slot) +func (pm *PeerManager) connectSlot(ctx context.Context, p *peer) { + err := pm.connect(ctx, p) if err != nil { - pm.slotsC <- slot // give slot back + // log.Errorf("%v", err) + pm.slotsC <- p.Id() // give slot back return } } @@ -347,26 +347,28 @@ func (pm *PeerManager) Run(ctx context.Context) error { log.Tracef("Run") defer log.Tracef("Run") - log.Infof("Starting DNS seeder") - minW := 5 - maxW := 59 - for { - err := pm.seed(ctx) - if err != nil { - log.Debugf("seed: %v", err) - } else { - break - } - - holdOff := time.Duration(minW+rand.IntN(maxW-minW)) * time.Second - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(holdOff): + if len(pm.seeds) == 0 { + log.Infof("Starting DNS seeder") + minW := 5 + maxW := 59 + for { + err := pm.seed(ctx) + if err != nil { + log.Debugf("seed: %v", err) + } else { + break + } + + holdOff := time.Duration(minW+rand.IntN(maxW-minW)) * time.Second + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(holdOff): + } } + log.Infof("DNS seeding complete") } pm.HandleAddr(pm.seeds) // Add all seeds to good list - log.Infof("DNS seeding complete") log.Infof("Starting peer manager") defer log.Infof("Peer manager stopped") @@ -379,9 +381,17 @@ func (pm *PeerManager) Run(ctx context.Context) error { for { select { case slot := <-pm.slotsC: - go pm.connectSlot(ctx, slot) + p, err := pm.randomPeer(ctx, slot) + if err != nil { + // basically no addresses, hold-off + <-time.After(7 * time.Second) + pm.slotsC <- slot // give the slot back + continue + } + go pm.connectSlot(ctx, p) case <-ctx.Done(): + log.Infof("exit") return ctx.Err() } } diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index ccfcc496..4a9090a3 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -204,13 +204,11 @@ func NewServer(cfg *Config) (*Server, error) { s.checkpoints = make(map[chainhash.Hash]uint64) wanted = 1 - // XXX currently broken - default: return nil, fmt.Errorf("invalid network: %v", cfg.Network) } - pm, err := NewPeerManager(s.wireNet, wanted) + pm, err := NewPeerManager(s.wireNet, s.cfg.Seeds, wanted) if err != nil { return nil, err } From e492fc69fbb8beb01dba1909a789008c6c3e2c5b Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Thu, 10 Oct 2024 18:37:32 +0100 Subject: [PATCH 09/21] Missing defer in mutex, thanks Joshua --- service/tbc/peer_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/tbc/peer_manager.go b/service/tbc/peer_manager.go index 082bc2d7..8cce0d9b 100644 --- a/service/tbc/peer_manager.go +++ b/service/tbc/peer_manager.go @@ -249,7 +249,7 @@ func (pm *PeerManager) Random() (*peer, error) { defer log.Tracef("Random exit") pm.mtx.RLock() - pm.mtx.RUnlock() + defer pm.mtx.RUnlock() for _, p := range pm.peers { if p.isConnected() { return p, nil From 55a1db147913cd27f797eec4c782e8d5ec58c76c Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Thu, 10 Oct 2024 18:38:58 +0100 Subject: [PATCH 10/21] Remove debug --- service/tbc/peer_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/tbc/peer_manager.go b/service/tbc/peer_manager.go index 8cce0d9b..6013817b 100644 --- a/service/tbc/peer_manager.go +++ b/service/tbc/peer_manager.go @@ -250,11 +250,11 @@ func (pm *PeerManager) Random() (*peer, error) { pm.mtx.RLock() defer pm.mtx.RUnlock() + for _, p := range pm.peers { if p.isConnected() { return p, nil } - log.Errorf("not connected: %v", p) // XXX } return nil, errors.New("no peers") From b089d71d0aa9d0399abd50b68537c501c21fa03f Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Thu, 10 Oct 2024 18:42:13 +0100 Subject: [PATCH 11/21] Joshua suggestions --- service/tbc/peer_manager.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/service/tbc/peer_manager.go b/service/tbc/peer_manager.go index 6013817b..0dac11dc 100644 --- a/service/tbc/peer_manager.go +++ b/service/tbc/peer_manager.go @@ -59,7 +59,7 @@ type PeerManager struct { // NewPeerManager returns a new peer manager. func NewPeerManager(net wire.BitcoinNet, seeds []string, want int) (*PeerManager, error) { - if want == 0 { + if want < 1 { return nil, errors.New("peers wanted must not be 0") } @@ -244,6 +244,7 @@ func (pm *PeerManager) Bad(ctx context.Context, address string) error { return nil } +// Random returns a random connected peer. func (pm *PeerManager) Random() (*peer, error) { log.Tracef("Random") defer log.Tracef("Random exit") @@ -260,6 +261,7 @@ func (pm *PeerManager) Random() (*peer, error) { return nil, errors.New("no peers") } +// RandomConnect blocks until there is a peer ready to use. func (pm *PeerManager) RandomConnect(ctx context.Context) (*peer, error) { log.Tracef("RandomConnect") defer log.Tracef("RandomConnect") From d2b100d03337b1620af8610c2874974dfbd4d26f Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Thu, 10 Oct 2024 18:53:08 +0100 Subject: [PATCH 12/21] Catch up to go 1.23 to get nifty new tcp dial options and fix tests --- go.mod | 2 +- service/tbc/peermanager_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index f3f79938..71170ad3 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/hemilabs/heminetwork -go 1.22 +go 1.23 toolchain go1.23.0 diff --git a/service/tbc/peermanager_test.go b/service/tbc/peermanager_test.go index e8c53186..a84a2d2f 100644 --- a/service/tbc/peermanager_test.go +++ b/service/tbc/peermanager_test.go @@ -37,7 +37,7 @@ func ping(ctx context.Context, t *testing.T, p *peer) error { func TestPeerManager(t *testing.T) { want := 2 wantLoop := want * 2 - pm, err := NewPeerManager(wire.TestNet3, want) + pm, err := NewPeerManager(wire.TestNet3, []string{}, want) if err != nil { t.Fatal(err) } @@ -66,7 +66,7 @@ func TestPeerManager(t *testing.T) { t.Logf("ping returned error but that's fine: %v", err) } // Always close - err = pm.Bad(pp.String()) + err = pm.Bad(ctx, pp.String()) if err != nil { panic(err) } From 382d10317e9976faf18afd332f01d660e649c626 Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Thu, 10 Oct 2024 19:03:39 +0100 Subject: [PATCH 13/21] Fix test --- service/tbc/peermanager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/tbc/peermanager_test.go b/service/tbc/peermanager_test.go index a84a2d2f..bdc3b6ad 100644 --- a/service/tbc/peermanager_test.go +++ b/service/tbc/peermanager_test.go @@ -77,7 +77,7 @@ func TestPeerManager(t *testing.T) { wg.Wait() - if len(pm.bad) != wantLoop { + if len(pm.bad) < wantLoop { t.Fatalf("not enough bad, got %v wanted %v", len(pm.bad), wantLoop) } if len(pm.peers) != 0 { From ba8c5945675985ce6d94423d536695139809ddf6 Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Thu, 10 Oct 2024 22:07:27 +0100 Subject: [PATCH 14/21] Bring back ping all peers --- service/tbc/peer_manager.go | 15 ++++++++ service/tbc/tbc.go | 73 ++++++++++++++++++------------------- 2 files changed, 51 insertions(+), 37 deletions(-) diff --git a/service/tbc/peer_manager.go b/service/tbc/peer_manager.go index 0dac11dc..4a031ba1 100644 --- a/service/tbc/peer_manager.go +++ b/service/tbc/peer_manager.go @@ -261,6 +261,21 @@ func (pm *PeerManager) Random() (*peer, error) { return nil, errors.New("no peers") } +// All runs a call back on all connected peers. +func (pm *PeerManager) All(ctx context.Context, f func(ctx context.Context, p *peer)) { + log.Tracef("All") + defer log.Tracef("All") + + pm.mtx.RLock() + defer pm.mtx.RUnlock() + for _, p := range pm.peers { + if !p.isConnected() { + continue + } + go f(ctx, p) + } +} + // RandomConnect blocks until there is a peer ready to use. func (pm *PeerManager) RandomConnect(ctx context.Context) (*peer, error) { log.Tracef("RandomConnect") diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 4a9090a3..43c43881 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -245,43 +245,27 @@ func (s *Server) pingExpired(ctx context.Context, key any, value any) { } } -// XXX BRING THIS BACK -//func (s *Server) pingAllPeers(ctx context.Context) { -// log.Tracef("pingAllPeers") -// defer log.Tracef("pingAllPeers exit") -// -// // XXX reason and explain why this cannot be reentrant -// s.mtx.Lock() -// defer s.mtx.Unlock() -// -// for _, p := range s.peers { -// select { -// case <-ctx.Done(): -// return -// default: -// } -// if !p.isConnected() { -// continue -// } -// -// // Cancel outstanding ping, should not happen -// peer := p.String() -// s.pings.Cancel(peer) -// -// // We don't really care about the response. We just want to -// // write to the connection to make it fail if the other side -// // went away. -// log.Debugf("Pinging: %v", p) -// err := p.write(defaultCmdTimeout, wire.NewMsgPing(uint64(time.Now().Unix()))) -// if err != nil { -// log.Debugf("ping %v: %v", p, err) -// return -// } -// -// // Record outstanding ping -// s.pings.Put(ctx, defaultPingTimeout, peer, p, s.pingExpired, nil) -// } -//} +func (s *Server) pingPeer(ctx context.Context, p *peer) { + log.Tracef("pingPeer %v", p) + defer log.Tracef("pingPeer %v exit", p) + + // Cancel outstanding ping, should not happen + peer := p.String() + s.pings.Cancel(peer) + + // We don't really care about the response. We just want to + // write to the connection to make it fail if the other side + // went away. + log.Debugf("Pinging: %v", p) + err := p.write(defaultCmdTimeout, wire.NewMsgPing(uint64(time.Now().Unix()))) + if err != nil { + log.Debugf("ping %v: %v", p, err) + return + } + + // Record outstanding ping + s.pings.Put(ctx, defaultPingTimeout, peer, p, s.pingExpired, nil) +} func (s *Server) handleGeneric(ctx context.Context, p *peer, msg wire.Message, raw []byte) (bool, error) { // Do accept addr and ping commands before we consider the peer up. @@ -1892,6 +1876,7 @@ func (s *Server) Run(pctx context.Context) error { } }() + // connected peers s.wg.Add(1) go func() { defer s.wg.Done() @@ -1911,6 +1896,20 @@ func (s *Server) Run(pctx context.Context) error { } }() + // ping loop + s.wg.Add(1) + go func() { + defer s.wg.Done() + for { + select { + case <-ctx.Done(): + return + case <-time.After(13 * time.Second): + } + s.pm.All(ctx, s.pingPeer) + } + }() + select { case <-ctx.Done(): err = ctx.Err() From 8ab583f199930b14abc2ecaed26a6da001033f4a Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Thu, 10 Oct 2024 22:28:56 +0100 Subject: [PATCH 15/21] Deal with signals --- cmd/tbcd/tbcd.go | 3 +++ service/tbc/tbc.go | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/cmd/tbcd/tbcd.go b/cmd/tbcd/tbcd.go index 5ef41d8f..76a83550 100644 --- a/cmd/tbcd/tbcd.go +++ b/cmd/tbcd/tbcd.go @@ -121,6 +121,9 @@ var ( ) func HandleSignals(ctx context.Context, cancel context.CancelFunc, callback func(os.Signal)) { + log.Tracef("HandleSignals") + defer log.Tracef("HandleSignals exit") + signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) defer func() { diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 43c43881..954a5668 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -1012,7 +1012,7 @@ func (s *Server) syncBlocks(ctx context.Context) { } // XXX rethink closure, this is because of index flag mutex. go func() { - if err = s.SyncIndexersToBest(ctx); err != nil && err != ErrAlreadyIndexing { + if err = s.SyncIndexersToBest(ctx); err != nil && !errors.Is(err, ErrAlreadyIndexing) && errors.Is(err, context.Canceled) { // XXX this is probably not a panic. panic(fmt.Errorf("sync blocks: %w", err)) } @@ -1883,9 +1883,9 @@ func (s *Server) Run(pctx context.Context) error { for { p, err := s.pm.RandomConnect(ctx) if err != nil { - // Should not be reached + // Only reached when context is closed. log.Errorf("random connect: %v", err) - continue + return } go func(pp *peer) { err := s.handlePeer(ctx, pp) From b4143a070734a580275ffda2e6e7fc22f7ac6dcb Mon Sep 17 00:00:00 2001 From: ClaytonNorthey92 Date: Thu, 10 Oct 2024 17:57:05 -0400 Subject: [PATCH 16/21] fix seeding, don't run test that connects to testnet3 --- service/tbc/peermanager_test.go | 1 + service/tbc/tbc.go | 5 ++++- service/tbc/tbc_test.go | 1 + service/tbc/tbcfork_test.go | 9 ++++++--- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/service/tbc/peermanager_test.go b/service/tbc/peermanager_test.go index bdc3b6ad..e750ee45 100644 --- a/service/tbc/peermanager_test.go +++ b/service/tbc/peermanager_test.go @@ -35,6 +35,7 @@ func ping(ctx context.Context, t *testing.T, p *peer) error { } func TestPeerManager(t *testing.T) { + t.Skip("this test connects to testnet3") want := 2 wantLoop := want * 2 pm, err := NewPeerManager(wire.TestNet3, []string{}, want) diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 954a5668..e4096cfc 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -1883,7 +1883,10 @@ func (s *Server) Run(pctx context.Context) error { for { p, err := s.pm.RandomConnect(ctx) if err != nil { - // Only reached when context is closed. + if errors.Is(err, context.Canceled) { + return + } + // Should not be reached log.Errorf("random connect: %v", err) return } diff --git a/service/tbc/tbc_test.go b/service/tbc/tbc_test.go index 6f5ea087..5ed46f07 100644 --- a/service/tbc/tbc_test.go +++ b/service/tbc/tbc_test.go @@ -828,6 +828,7 @@ func createTbcServer(ctx context.Context, t *testing.T, mappedPeerPort nat.Port) cfg.LevelDBHome = home cfg.Network = networkLocalnet cfg.ListenAddress = tcbListenAddress + cfg.Seeds = localnetSeeds tbcServer, err := NewServer(cfg) if err != nil { t.Fatal(err) diff --git a/service/tbc/tbcfork_test.go b/service/tbc/tbcfork_test.go index ad79babc..5539e5e1 100644 --- a/service/tbc/tbcfork_test.go +++ b/service/tbc/tbcfork_test.go @@ -877,7 +877,7 @@ func TestFork(t *testing.T) { t.Fatal(err) } // t.Logf("%v", spew.Sdump(n.chain[n.Best()[0].String()])) - time.Sleep(500 * time.Millisecond) // XXX + time.Sleep(2 * time.Second) // XXX // Connect tbc service cfg := &Config{ @@ -892,6 +892,7 @@ func TestFork(t *testing.T) { Network: networkLocalnet, PeersWanted: 1, PrometheusListenAddress: "", + Seeds: []string{"127.0.0.1:18444"}, } _ = loggo.ConfigureLoggers(cfg.LogLevel) s, err := NewServer(cfg) @@ -1114,7 +1115,7 @@ func TestIndexNoFork(t *testing.T) { panic(err) } }() - time.Sleep(time.Second) + time.Sleep(time.Second * 2) // Connect tbc service cfg := &Config{ @@ -1129,6 +1130,7 @@ func TestIndexNoFork(t *testing.T) { Network: networkLocalnet, PeersWanted: 1, PrometheusListenAddress: "", + Seeds: []string{"127.0.0.1:18444"}, } _ = loggo.ConfigureLoggers(cfg.LogLevel) s, err := NewServer(cfg) @@ -1285,7 +1287,7 @@ func TestIndexFork(t *testing.T) { panic(err) } }() - time.Sleep(time.Second) + time.Sleep(time.Second * 2) // Connect tbc service cfg := &Config{ @@ -1301,6 +1303,7 @@ func TestIndexFork(t *testing.T) { PeersWanted: 1, PrometheusListenAddress: "", MempoolEnabled: false, + Seeds: []string{"127.0.0.1:18444"}, } _ = loggo.ConfigureLoggers(cfg.LogLevel) s, err := NewServer(cfg) From fa7d0ef63211aa3d3d5a847d2bbfd4639e3b871d Mon Sep 17 00:00:00 2001 From: ClaytonNorthey92 Date: Thu, 10 Oct 2024 18:06:50 -0400 Subject: [PATCH 17/21] fix make race; do not use same host and port for each test --- service/tbc/tbcfork_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/service/tbc/tbcfork_test.go b/service/tbc/tbcfork_test.go index 5539e5e1..a17d3fdc 100644 --- a/service/tbc/tbcfork_test.go +++ b/service/tbc/tbcfork_test.go @@ -27,7 +27,6 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/juju/loggo" - "github.com/hemilabs/heminetwork/api/tbcapi" "github.com/hemilabs/heminetwork/database/tbcd" ) @@ -886,13 +885,13 @@ func TestFork(t *testing.T) { BlockheaderCache: 1000, BlockSanity: false, LevelDBHome: t.TempDir(), - ListenAddress: tbcapi.DefaultListen, // TODO: should use random free port // LogLevel: "tbcd=TRACE:tbc=TRACE:level=DEBUG", MaxCachedTxs: 1000, // XXX Network: networkLocalnet, PeersWanted: 1, PrometheusListenAddress: "", Seeds: []string{"127.0.0.1:18444"}, + ListenAddress: "localhost:8881", } _ = loggo.ConfigureLoggers(cfg.LogLevel) s, err := NewServer(cfg) @@ -1124,7 +1123,7 @@ func TestIndexNoFork(t *testing.T) { BlockheaderCache: 1000, BlockSanity: false, LevelDBHome: t.TempDir(), - ListenAddress: tbcapi.DefaultListen, + ListenAddress: "localhost:8882", // LogLevel: "tbcd=TRACE:tbc=TRACE:level=DEBUG", MaxCachedTxs: 1000, // XXX Network: networkLocalnet, @@ -1296,7 +1295,7 @@ func TestIndexFork(t *testing.T) { BlockheaderCache: 1000, BlockSanity: false, LevelDBHome: t.TempDir(), - ListenAddress: tbcapi.DefaultListen, + ListenAddress: "localhost:8883", // LogLevel: "tbcd=TRACE:tbc=TRACE:level=DEBUG", MaxCachedTxs: 1000, // XXX Network: networkLocalnet, From c9f17c3ea00cdabf591d3d98b3cdb84ba85f4494 Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Thu, 10 Oct 2024 23:09:47 +0100 Subject: [PATCH 18/21] Make it the same as bssd --- cmd/tbcd/tbcd.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/cmd/tbcd/tbcd.go b/cmd/tbcd/tbcd.go index 76a83550..0bd0662d 100644 --- a/cmd/tbcd/tbcd.go +++ b/cmd/tbcd/tbcd.go @@ -121,9 +121,6 @@ var ( ) func HandleSignals(ctx context.Context, cancel context.CancelFunc, callback func(os.Signal)) { - log.Tracef("HandleSignals") - defer log.Tracef("HandleSignals exit") - signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) defer func() { @@ -143,11 +140,6 @@ func HandleSignals(ctx context.Context, cancel context.CancelFunc, callback func os.Exit(2) } -func init() { - version.Component = "tbcd" - welcome = "Hemi Tiny Bitcoin Daemon " + version.BuildInfo() -} - func _main() error { // Parse configuration from environment if err := config.Parse(cm); err != nil { @@ -207,6 +199,11 @@ func _main() error { return nil } +func init() { + version.Component = "tbcd" + welcome = "Hemi Tiny Bitcoin Daemon " + version.BuildInfo() +} + func main() { if len(os.Args) != 1 { fmt.Fprintf(os.Stderr, "%v\n", welcome) @@ -218,7 +215,7 @@ func main() { } if err := _main(); err != nil { - log.Errorf("%v", err) + fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } } From 541a2268b8079a409d23b230b24f44accc573282 Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Fri, 11 Oct 2024 14:02:14 +0100 Subject: [PATCH 19/21] Oops, forgot to commit this --- service/tbc/peer.go | 3 --- service/tbc/peer_manager.go | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/service/tbc/peer.go b/service/tbc/peer.go index 88f77076..582258f0 100644 --- a/service/tbc/peer.go +++ b/service/tbc/peer.go @@ -224,9 +224,6 @@ func (p *peer) close() error { log.Tracef("close") defer log.Tracef("close exit") - if p == nil { - panic("p") - } p.mtx.Lock() conn := p.conn p.conn = nil diff --git a/service/tbc/peer_manager.go b/service/tbc/peer_manager.go index 4a031ba1..f44c405a 100644 --- a/service/tbc/peer_manager.go +++ b/service/tbc/peer_manager.go @@ -258,7 +258,7 @@ func (pm *PeerManager) Random() (*peer, error) { } } - return nil, errors.New("no peers") + return nil, errors.New("no connected peers") } // All runs a call back on all connected peers. From 057a7f5c70dc614a100adac663c9f8185ff1d4e2 Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Fri, 11 Oct 2024 14:20:35 +0100 Subject: [PATCH 20/21] sundry joshua nits --- service/tbc/peer_manager.go | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/service/tbc/peer_manager.go b/service/tbc/peer_manager.go index f44c405a..5ee446c3 100644 --- a/service/tbc/peer_manager.go +++ b/service/tbc/peer_manager.go @@ -36,6 +36,11 @@ var ( "seed.bitnodes.io:8333", "seed.bitcoin.jonasschnelli.ch:8333", } + + ErrReset = errors.New("reset") + ErrNoAddresses = errors.New("no addresses") + ErrDNSSeed = errors.New("could not dns seed") + ErrNoConnectedPeers = errors.New("no connected peers") ) // PeerManager keeps track of the available peers and their quality. @@ -103,18 +108,15 @@ func (pm *PeerManager) seed(pctx context.Context) error { ctx, cancel := context.WithTimeout(pctx, 15*time.Second) defer cancel() - errorsSeen := 0 for _, v := range pm.dnsSeeds { host, port, err := net.SplitHostPort(v) if err != nil { log.Errorf("Failed to parse host/port: %v", err) - errorsSeen++ continue } ips, err := resolver.LookupIP(ctx, "ip", host) if err != nil { log.Errorf("lookup: %v", err) - errorsSeen++ continue } @@ -125,7 +127,7 @@ func (pm *PeerManager) seed(pctx context.Context) error { } if len(pm.seeds) == 0 { - return errors.New("could not dns seed") + return ErrDNSSeed } return nil @@ -133,14 +135,17 @@ func (pm *PeerManager) seed(pctx context.Context) error { // Stats returns peer statistics. func (pm *PeerManager) Stats() (int, int, int) { - log.Tracef("PeersStats") - defer log.Tracef("PeersStats exit") + log.Tracef("Stats") + defer log.Tracef("Stats exit") pm.mtx.RLock() defer pm.mtx.RUnlock() return len(pm.peers), len(pm.good), len(pm.bad) } +// handleAddr adds peers to the good list if they do not exist in the connected +// and bad list. +// Note that this function requires the mutex to be held. func (pm *PeerManager) handleAddr(peers []string) { for _, addr := range peers { _, _, err := net.SplitHostPort(addr) @@ -170,7 +175,7 @@ func (pm *PeerManager) HandleAddr(peers []string) { pm.handleAddr(peers) } -// Good adds peer good list. +// Good adds peer to good list if it does not exist in connected and good list already. func (pm *PeerManager) Good(address string) error { log.Tracef("Good") defer log.Tracef("Good exit") @@ -258,7 +263,7 @@ func (pm *PeerManager) Random() (*peer, error) { } } - return nil, errors.New("no connected peers") + return nil, ErrNoConnectedPeers } // All runs a call back on all connected peers. @@ -301,7 +306,7 @@ func (pm *PeerManager) randomPeer(ctx context.Context, slot int) (*peer, error) // back to seeds. clear(pm.bad) pm.handleAddr(pm.seeds) - return nil, errors.New("reset") + return nil, ErrReset } for k := range pm.good { if _, ok := pm.peers[k]; ok { @@ -323,7 +328,7 @@ func (pm *PeerManager) randomPeer(ctx context.Context, slot int) (*peer, error) return NewPeer(pm.net, slot, k) } - return nil, errors.New("no addresses") + return nil, ErrNoAddresses } func (pm *PeerManager) connect(ctx context.Context, p *peer) error { @@ -341,7 +346,7 @@ func (pm *PeerManager) connect(ctx context.Context, p *peer) error { p.close() // close new peer and don't add it log.Errorf("peer already connected: %v", p) pm.mtx.Unlock() - return fmt.Errorf("connect %w", err) + return fmt.Errorf("connect: %w", err) } pm.peers[p.String()] = p pm.mtx.Unlock() @@ -352,8 +357,7 @@ func (pm *PeerManager) connect(ctx context.Context, p *peer) error { } func (pm *PeerManager) connectSlot(ctx context.Context, p *peer) { - err := pm.connect(ctx, p) - if err != nil { + if err := pm.connect(ctx, p); err != nil { // log.Errorf("%v", err) pm.slotsC <- p.Id() // give slot back return From c53e517ba52b87330632e229cfe5e6eb77438c50 Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Fri, 11 Oct 2024 14:22:52 +0100 Subject: [PATCH 21/21] Fix error --- service/tbc/peer_manager.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/service/tbc/peer_manager.go b/service/tbc/peer_manager.go index 5ee446c3..bd5072ae 100644 --- a/service/tbc/peer_manager.go +++ b/service/tbc/peer_manager.go @@ -335,8 +335,7 @@ func (pm *PeerManager) connect(ctx context.Context, p *peer) error { log.Tracef("connect: %v %v", p.Id(), p) defer log.Tracef("connect exit: %v %v", p.Id(), p) - err := p.connect(ctx) - if err != nil { + if err := p.connect(ctx); err != nil { return fmt.Errorf("new peer: %v", err) } @@ -346,7 +345,7 @@ func (pm *PeerManager) connect(ctx context.Context, p *peer) error { p.close() // close new peer and don't add it log.Errorf("peer already connected: %v", p) pm.mtx.Unlock() - return fmt.Errorf("connect: %w", err) + return fmt.Errorf("peer already connected: %v", p) } pm.peers[p.String()] = p pm.mtx.Unlock()