Skip to content

Commit

Permalink
multi: Add atomic start/stop functions.
Browse files Browse the repository at this point in the history
Make sure that each subsystem only starts and stop once. This makes
sure we don't close e.g. quit channels twice.
  • Loading branch information
ziggie1984 committed Jul 11, 2024
1 parent f464dac commit 32d68b3
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 18 deletions.
24 changes: 21 additions & 3 deletions chanfitness/chaneventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ package chanfitness

import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/btcsuite/btcd/wire"
Expand Down Expand Up @@ -48,6 +50,9 @@ var (
// ChannelEventStore maintains a set of event logs for the node's channels to
// provide insight into the performance and health of channels.
type ChannelEventStore struct {
started atomic.Bool
stopped atomic.Bool

cfg *Config

// peers tracks all of our currently monitored peers and their channels.
Expand Down Expand Up @@ -142,7 +147,11 @@ func NewChannelEventStore(config *Config) *ChannelEventStore {
// information from the store. If this function fails, it cancels its existing
// subscriptions and returns an error.
func (c *ChannelEventStore) Start() error {
log.Info("ChannelEventStore starting")
log.Info("ChannelEventStore starting...")

if c.started.Swap(true) {
return fmt.Errorf("ChannelEventStore started more than once")
}

// Create a subscription to channel events.
channelClient, err := c.cfg.SubscribeChannelEvents()
Expand Down Expand Up @@ -198,13 +207,18 @@ func (c *ChannelEventStore) Start() error {
cancel: cancel,
})

log.Info("ChannelEventStore started")

return nil
}

// Stop terminates all goroutines started by the event store.
func (c *ChannelEventStore) Stop() {
func (c *ChannelEventStore) Stop() error {
log.Info("ChannelEventStore shutting down...")
defer log.Debug("ChannelEventStore shutdown complete")

if c.stopped.Swap(true) {
return fmt.Errorf("ChannelEventStore stopped more than once")
}

// Stop the consume goroutine.
close(c.quit)
Expand All @@ -213,6 +227,10 @@ func (c *ChannelEventStore) Stop() {
// Stop the ticker after the goroutine reading from it has exited, to
// avoid a race.
c.cfg.FlapCountTicker.Stop()

log.Infof("ChannelEventStore shutdown complete")

return nil
}

// addChannel checks whether we are already tracking a channel's peer, creates a
Expand Down
20 changes: 20 additions & 0 deletions htlcswitch/interceptable_switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/sha256"
"fmt"
"sync"
"sync/atomic"

"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
Expand Down Expand Up @@ -33,6 +34,9 @@ var (
// Settle - routes UpdateFulfillHTLC to the originating link.
// Fail - routes UpdateFailHTLC to the originating link.
type InterceptableSwitch struct {
started atomic.Bool
stopped atomic.Bool

// htlcSwitch is the underline switch
htlcSwitch *Switch

Expand Down Expand Up @@ -201,6 +205,12 @@ func (s *InterceptableSwitch) SetInterceptor(
}

func (s *InterceptableSwitch) Start() error {
log.Info("InterceptableSwitch starting...")

if s.started.Swap(true) {
return fmt.Errorf("InterceptableSwitch started more than once")
}

blockEpochStream, err := s.notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return err
Expand All @@ -217,15 +227,25 @@ func (s *InterceptableSwitch) Start() error {
}
}()

log.Info("InterceptableSwitch started")

return nil
}

func (s *InterceptableSwitch) Stop() error {
log.Info("InterceptableSwitch shutting down...")

if s.stopped.Swap(true) {
return fmt.Errorf("InterceptableSwitch stopped more than once")
}

close(s.quit)
s.wg.Wait()

s.blockEpochStream.Cancel()

log.Infof("InterceptableSwitch shutdown complete")

return nil
}

Expand Down
43 changes: 32 additions & 11 deletions invoices/invoiceregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool {
// created by the daemon. The registry is a thin wrapper around a map in order
// to ensure that all updates/reads are thread safe.
type InvoiceRegistry struct {
started atomic.Bool
stopped atomic.Bool

sync.RWMutex

nextClientID uint32 // must be used atomically
Expand Down Expand Up @@ -213,33 +216,48 @@ func (i *InvoiceRegistry) scanInvoicesOnStart(ctx context.Context) error {

// Start starts the registry and all goroutines it needs to carry out its task.
func (i *InvoiceRegistry) Start() error {
// Start InvoiceExpiryWatcher and prepopulate it with existing active
// invoices.
err := i.expiryWatcher.Start(func(hash lntypes.Hash, force bool) error {
return i.cancelInvoiceImpl(context.Background(), hash, force)
})
var err error

log.Info("InvoiceRegistry starting...")

if i.started.Swap(true) {
return fmt.Errorf("InvoiceRegistry started more than once")
}
// Start InvoiceExpiryWatcher and prepopulate it with existing
// active invoices.
err = i.expiryWatcher.Start(
func(hash lntypes.Hash, force bool) error {
return i.cancelInvoiceImpl(
context.Background(), hash, force,
)
})
if err != nil {
return err
}

log.Info("InvoiceRegistry starting")

i.wg.Add(1)
go i.invoiceEventLoop()

// Now scan all pending and removable invoices to the expiry watcher or
// delete them.
// Now scan all pending and removable invoices to the expiry
// watcher or delete them.
err = i.scanInvoicesOnStart(context.Background())
if err != nil {
_ = i.Stop()
return err
}

return nil
log.Info("InvoiceRegistry started")

return err
}

// Stop signals the registry for a graceful shutdown.
func (i *InvoiceRegistry) Stop() error {
log.Info("InvoiceRegistry shutting down...")

if i.stopped.Swap(true) {
return fmt.Errorf("InvoiceRegistry stopped more than once")
}

log.Info("InvoiceRegistry shutting down...")
defer log.Debug("InvoiceRegistry shutdown complete")

Expand All @@ -248,6 +266,9 @@ func (i *InvoiceRegistry) Stop() error {
close(i.quit)

i.wg.Wait()

log.Infof("InvoiceRegistry shutdown complete")

return nil
}

Expand Down
23 changes: 19 additions & 4 deletions sweep/fee_bumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ type TxPublisherConfig struct {
// until the tx is confirmed or the fee rate reaches the maximum fee rate
// specified by the caller.
type TxPublisher struct {
started atomic.Bool
stopped atomic.Bool

wg sync.WaitGroup

// cfg specifies the configuration of the TxPublisher.
Expand Down Expand Up @@ -669,7 +672,10 @@ type monitorRecord struct {
// off the monitor loop.
func (t *TxPublisher) Start() error {
log.Info("TxPublisher starting...")
defer log.Debugf("TxPublisher started")

if t.started.Swap(true) {
return fmt.Errorf("TxPublisher started more than once")
}

blockEvent, err := t.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
Expand All @@ -679,17 +685,26 @@ func (t *TxPublisher) Start() error {
t.wg.Add(1)
go t.monitor(blockEvent)

log.Debugf("TxPublisher started")

return nil
}

// Stop stops the publisher and waits for the monitor loop to exit.
func (t *TxPublisher) Stop() {
func (t *TxPublisher) Stop() error {
log.Info("TxPublisher stopping...")
defer log.Debugf("TxPublisher stopped")

close(t.quit)
if t.stopped.Swap(true) {
return fmt.Errorf("TxPublisher stopped more than once")
}

close(t.quit)
t.wg.Wait()

log.Info("TxPublisher stopped")

return nil

}

// monitor is the main loop driven by new blocks. Whevenr a new block arrives,
Expand Down

0 comments on commit 32d68b3

Please sign in to comment.