From 32d68b361b93cd774ec0169d30f6c72ba355e442 Mon Sep 17 00:00:00 2001 From: ziggie Date: Wed, 8 May 2024 20:22:21 +0100 Subject: [PATCH] multi: Add atomic start/stop functions. Make sure that each subsystem only starts and stop once. This makes sure we don't close e.g. quit channels twice. --- chanfitness/chaneventstore.go | 24 ++++++++++++++--- htlcswitch/interceptable_switch.go | 20 ++++++++++++++ invoices/invoiceregistry.go | 43 ++++++++++++++++++++++-------- sweep/fee_bumper.go | 23 +++++++++++++--- 4 files changed, 92 insertions(+), 18 deletions(-) diff --git a/chanfitness/chaneventstore.go b/chanfitness/chaneventstore.go index c3aa25efd9e..a882f1bda95 100644 --- a/chanfitness/chaneventstore.go +++ b/chanfitness/chaneventstore.go @@ -12,7 +12,9 @@ package chanfitness import ( "errors" + "fmt" "sync" + "sync/atomic" "time" "github.com/btcsuite/btcd/wire" @@ -48,6 +50,9 @@ var ( // ChannelEventStore maintains a set of event logs for the node's channels to // provide insight into the performance and health of channels. type ChannelEventStore struct { + started atomic.Bool + stopped atomic.Bool + cfg *Config // peers tracks all of our currently monitored peers and their channels. @@ -142,7 +147,11 @@ func NewChannelEventStore(config *Config) *ChannelEventStore { // information from the store. If this function fails, it cancels its existing // subscriptions and returns an error. func (c *ChannelEventStore) Start() error { - log.Info("ChannelEventStore starting") + log.Info("ChannelEventStore starting...") + + if c.started.Swap(true) { + return fmt.Errorf("ChannelEventStore started more than once") + } // Create a subscription to channel events. channelClient, err := c.cfg.SubscribeChannelEvents() @@ -198,13 +207,18 @@ func (c *ChannelEventStore) Start() error { cancel: cancel, }) + log.Info("ChannelEventStore started") + return nil } // Stop terminates all goroutines started by the event store. -func (c *ChannelEventStore) Stop() { +func (c *ChannelEventStore) Stop() error { log.Info("ChannelEventStore shutting down...") - defer log.Debug("ChannelEventStore shutdown complete") + + if c.stopped.Swap(true) { + return fmt.Errorf("ChannelEventStore stopped more than once") + } // Stop the consume goroutine. close(c.quit) @@ -213,6 +227,10 @@ func (c *ChannelEventStore) Stop() { // Stop the ticker after the goroutine reading from it has exited, to // avoid a race. c.cfg.FlapCountTicker.Stop() + + log.Infof("ChannelEventStore shutdown complete") + + return nil } // addChannel checks whether we are already tracking a channel's peer, creates a diff --git a/htlcswitch/interceptable_switch.go b/htlcswitch/interceptable_switch.go index 62f7c93dd8f..812e82abd2c 100644 --- a/htlcswitch/interceptable_switch.go +++ b/htlcswitch/interceptable_switch.go @@ -4,6 +4,7 @@ import ( "crypto/sha256" "fmt" "sync" + "sync/atomic" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" @@ -33,6 +34,9 @@ var ( // Settle - routes UpdateFulfillHTLC to the originating link. // Fail - routes UpdateFailHTLC to the originating link. type InterceptableSwitch struct { + started atomic.Bool + stopped atomic.Bool + // htlcSwitch is the underline switch htlcSwitch *Switch @@ -201,6 +205,12 @@ func (s *InterceptableSwitch) SetInterceptor( } func (s *InterceptableSwitch) Start() error { + log.Info("InterceptableSwitch starting...") + + if s.started.Swap(true) { + return fmt.Errorf("InterceptableSwitch started more than once") + } + blockEpochStream, err := s.notifier.RegisterBlockEpochNtfn(nil) if err != nil { return err @@ -217,15 +227,25 @@ func (s *InterceptableSwitch) Start() error { } }() + log.Info("InterceptableSwitch started") + return nil } func (s *InterceptableSwitch) Stop() error { + log.Info("InterceptableSwitch shutting down...") + + if s.stopped.Swap(true) { + return fmt.Errorf("InterceptableSwitch stopped more than once") + } + close(s.quit) s.wg.Wait() s.blockEpochStream.Cancel() + log.Infof("InterceptableSwitch shutdown complete") + return nil } diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index de731b4740b..2b3f85d442a 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -101,6 +101,9 @@ func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool { // created by the daemon. The registry is a thin wrapper around a map in order // to ensure that all updates/reads are thread safe. type InvoiceRegistry struct { + started atomic.Bool + stopped atomic.Bool + sync.RWMutex nextClientID uint32 // must be used atomically @@ -213,33 +216,48 @@ func (i *InvoiceRegistry) scanInvoicesOnStart(ctx context.Context) error { // Start starts the registry and all goroutines it needs to carry out its task. func (i *InvoiceRegistry) Start() error { - // Start InvoiceExpiryWatcher and prepopulate it with existing active - // invoices. - err := i.expiryWatcher.Start(func(hash lntypes.Hash, force bool) error { - return i.cancelInvoiceImpl(context.Background(), hash, force) - }) + var err error + + log.Info("InvoiceRegistry starting...") + + if i.started.Swap(true) { + return fmt.Errorf("InvoiceRegistry started more than once") + } + // Start InvoiceExpiryWatcher and prepopulate it with existing + // active invoices. + err = i.expiryWatcher.Start( + func(hash lntypes.Hash, force bool) error { + return i.cancelInvoiceImpl( + context.Background(), hash, force, + ) + }) if err != nil { return err } - log.Info("InvoiceRegistry starting") - i.wg.Add(1) go i.invoiceEventLoop() - // Now scan all pending and removable invoices to the expiry watcher or - // delete them. + // Now scan all pending and removable invoices to the expiry + // watcher or delete them. err = i.scanInvoicesOnStart(context.Background()) if err != nil { _ = i.Stop() - return err } - return nil + log.Info("InvoiceRegistry started") + + return err } // Stop signals the registry for a graceful shutdown. func (i *InvoiceRegistry) Stop() error { + log.Info("InvoiceRegistry shutting down...") + + if i.stopped.Swap(true) { + return fmt.Errorf("InvoiceRegistry stopped more than once") + } + log.Info("InvoiceRegistry shutting down...") defer log.Debug("InvoiceRegistry shutdown complete") @@ -248,6 +266,9 @@ func (i *InvoiceRegistry) Stop() error { close(i.quit) i.wg.Wait() + + log.Infof("InvoiceRegistry shutdown complete") + return nil } diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index e7928016698..f520709ad58 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -262,6 +262,9 @@ type TxPublisherConfig struct { // until the tx is confirmed or the fee rate reaches the maximum fee rate // specified by the caller. type TxPublisher struct { + started atomic.Bool + stopped atomic.Bool + wg sync.WaitGroup // cfg specifies the configuration of the TxPublisher. @@ -669,7 +672,10 @@ type monitorRecord struct { // off the monitor loop. func (t *TxPublisher) Start() error { log.Info("TxPublisher starting...") - defer log.Debugf("TxPublisher started") + + if t.started.Swap(true) { + return fmt.Errorf("TxPublisher started more than once") + } blockEvent, err := t.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { @@ -679,17 +685,26 @@ func (t *TxPublisher) Start() error { t.wg.Add(1) go t.monitor(blockEvent) + log.Debugf("TxPublisher started") + return nil } // Stop stops the publisher and waits for the monitor loop to exit. -func (t *TxPublisher) Stop() { +func (t *TxPublisher) Stop() error { log.Info("TxPublisher stopping...") - defer log.Debugf("TxPublisher stopped") - close(t.quit) + if t.stopped.Swap(true) { + return fmt.Errorf("TxPublisher stopped more than once") + } + close(t.quit) t.wg.Wait() + + log.Info("TxPublisher stopped") + + return nil + } // monitor is the main loop driven by new blocks. Whevenr a new block arrives,