Skip to content

Commit

Permalink
chore: Monitor/Add stop mechanism and fix linter (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
gitferry authored Jan 30, 2023
1 parent 3f36537 commit 1676ce1
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 63 deletions.
2 changes: 1 addition & 1 deletion btcclient/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (c *Client) getChainBlocks(baseHeight uint64, tipBlock *types.IndexedBlock)
chainBlocks := make([]*types.IndexedBlock, tipHeight-baseHeight)
chainBlocks[len(chainBlocks)-1] = tipBlock
prevHash := &tipBlock.Header.PrevBlock
for i := tipHeight - baseHeight - 1; i >= 0; i-- {
for i := int(tipHeight-baseHeight) - 1; i >= 0; i-- {
ib, mb, err := c.GetBlockByHash(prevHash)
if err != nil {
return nil, fmt.Errorf("failed to get block by hash %x: %w", prevHash, err)
Expand Down
5 changes: 4 additions & 1 deletion cmd/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
configFileNameFlag = "config"
genesisFileNameFlag = "genesis"

ConfigFileNameDefault = "monitor.yml"
ConfigFileNameDefault = "vigilante.yml"
GenesisFileNameDefault = "genesis.json"
)

Expand Down Expand Up @@ -89,6 +89,9 @@ func cmdFunc(cmd *cobra.Command, args []string) {
k,
babylonClient.GetTagIdx(),
)
if err != nil {
panic(fmt.Errorf("failed to create BTC scanner: %w", err))
}
// create monitor
vigilanteMonitor, err = monitor.New(&cfg.Monitor, genesisInfo, btcScanner, babylonClient)
if err != nil {
Expand Down
7 changes: 3 additions & 4 deletions monitor/btcscanner/block_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
// blockEventHandler handles connected and disconnected blocks from the BTC client.
func (bs *BtcScanner) blockEventHandler() {
defer bs.wg.Done()
quit := bs.quitChan()

for {
select {
case <-bs.quit:
bs.BtcClient.Stop()
return
case event, open := <-bs.BtcClient.BlockEventChan():
if !open {
log.Errorf("Block event channel is closed")
Expand All @@ -35,9 +37,6 @@ func (bs *BtcScanner) blockEventHandler() {
bs.Bootstrap()
}
}
case <-quit:
// We have been asked to stop
return
}
}
}
Expand Down
88 changes: 51 additions & 37 deletions monitor/btcscanner/btc_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@ package btcscanner
import (
"fmt"
"github.com/babylonchain/babylon/btctxformatter"
"github.com/babylonchain/vigilante/netparams"
"github.com/btcsuite/btcd/wire"
"go.uber.org/atomic"
"sync"

ckpttypes "github.com/babylonchain/babylon/x/checkpointing/types"
"github.com/babylonchain/vigilante/btcclient"
"github.com/babylonchain/vigilante/config"
"github.com/babylonchain/vigilante/netparams"
"github.com/babylonchain/vigilante/types"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcd/wire"
"go.uber.org/atomic"
"sync"
)

type BtcScanner struct {
Expand All @@ -39,9 +37,8 @@ type BtcScanner struct {
Synced *atomic.Bool

wg sync.WaitGroup
started bool
Started *atomic.Bool
quit chan struct{}
quitMu sync.Mutex
}

func New(btcCfg *config.BTCConfig, monitorCfg *config.MonitorConfig, btcClient btcclient.BTCClient, btclightclientBaseHeight uint64, btcConfirmationDepth uint64, tagID uint8) (*BtcScanner, error) {
Expand All @@ -65,28 +62,51 @@ func New(btcCfg *config.BTCConfig, monitorCfg *config.MonitorConfig, btcClient b
blockHeaderChan: headersChan,
checkpointsChan: ckptsChan,
Synced: atomic.NewBool(false),
Started: atomic.NewBool(false),
quit: make(chan struct{}),
}, nil
}

// Start starts the scanning process from curBTCHeight to tipHeight
func (bs *BtcScanner) Start() {
bs.BtcClient.MustSubscribeBlocks()
if bs.Started.Load() {
log.Info("the BTC scanner is already started")
return
}

// the bootstrapping should not block the main thread
go bs.Bootstrap()
for {
block := bs.getNextConfirmedBlock()
// send the header to the Monitor for consistency check
bs.blockHeaderChan <- block.Header
ckptBtc := bs.tryToExtractCheckpoint(block)
if ckptBtc == nil {
log.Debugf("checkpoint not found at BTC block %v", block.Height)
// move to the next BTC block
continue
}
log.Infof("got a checkpoint at BTC block %v", block.Height)

bs.checkpointsChan <- ckptBtc
// move to the next BTC block
bs.BtcClient.MustSubscribeBlocks()

bs.Started.Store(true)
log.Info("the BTC scanner is started")

// start handling new blocks
bs.wg.Add(1)
go bs.blockEventHandler()

for bs.Started.Load() {
select {
case <-bs.quit:
bs.Started.Store(false)
case block := <-bs.ConfirmedBlocksChan:
// send the header to the Monitor for consistency check
bs.blockHeaderChan <- block.Header
ckptBtc := bs.tryToExtractCheckpoint(block)
if ckptBtc == nil {
log.Debugf("checkpoint not found at BTC block %v", block.Height)
// move to the next BTC block
continue
}
log.Infof("got a checkpoint at BTC block %v", block.Height)

bs.checkpointsChan <- ckptBtc
}
}

bs.wg.Wait()
log.Info("the BTC scanner is stopped")
}

// Bootstrap syncs with BTC by getting the confirmed blocks and the caching the unconfirmed blocks
Expand Down Expand Up @@ -138,11 +158,6 @@ func (bs *BtcScanner) Bootstrap() {
bs.sendConfirmedBlocksToChan(confirmedBlocks)
}

// getNextConfirmedBlock returns the next confirmed block from the channel
func (bs *BtcScanner) getNextConfirmedBlock() *types.IndexedBlock {
return <-bs.ConfirmedBlocksChan
}

func (bs *BtcScanner) GetHeadersChan() chan *wire.BlockHeader {
return bs.blockHeaderChan
}
Expand All @@ -155,7 +170,7 @@ func (bs *BtcScanner) sendConfirmedBlocksToChan(blocks []*types.IndexedBlock) {
}

func (bs *BtcScanner) tryToExtractCheckpoint(block *types.IndexedBlock) *types.CheckpointRecord {
found := bs.tryToExtractCkptSegment(block.Txs)
found := bs.tryToExtractCkptSegment(block)
if !found {
return nil
}
Expand All @@ -173,6 +188,9 @@ func (bs *BtcScanner) tryToExtractCheckpoint(block *types.IndexedBlock) *types.C
func (bs *BtcScanner) matchAndPop() (*types.CheckpointRecord, error) {
bs.ckptCache.Match()
ckptSegments := bs.ckptCache.PopEarliestCheckpoint()
if ckptSegments == nil {
return nil, nil
}
connectedBytes, err := btctxformatter.ConnectParts(bs.ckptCache.Version, ckptSegments.Segments[0].Data, ckptSegments.Segments[1].Data)
if err != nil {
return nil, fmt.Errorf("failed to connect two checkpoint parts: %w", err)
Expand All @@ -189,15 +207,15 @@ func (bs *BtcScanner) matchAndPop() (*types.CheckpointRecord, error) {
}, nil
}

func (bs *BtcScanner) tryToExtractCkptSegment(txs []*btcutil.Tx) bool {
func (bs *BtcScanner) tryToExtractCkptSegment(b *types.IndexedBlock) bool {
found := false
for _, tx := range txs {
for _, tx := range b.Txs {
if tx == nil {
continue
}

// cache the segment to ckptCache
ckptSeg := types.NewCkptSegment(bs.ckptCache.Tag, bs.ckptCache.Version, nil, tx)
ckptSeg := types.NewCkptSegment(bs.ckptCache.Tag, bs.ckptCache.Version, b, tx)
if ckptSeg != nil {
err := bs.ckptCache.AddSegment(ckptSeg)
if err != nil {
Expand All @@ -214,10 +232,6 @@ func (bs *BtcScanner) GetCheckpointsChan() chan *types.CheckpointRecord {
return bs.checkpointsChan
}

// quitChan atomically reads the quit channel.
func (bs *BtcScanner) quitChan() <-chan struct{} {
bs.quitMu.Lock()
c := bs.quit
bs.quitMu.Unlock()
return c
func (bs *BtcScanner) Stop() {
close(bs.quit)
}
2 changes: 1 addition & 1 deletion monitor/btcscanner/btc_scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func FuzzBootStrap(f *testing.F) {
BtcClient: mockBtcClient,
BaseHeight: baseHeight,
K: k,
ConfirmedBlocksChan: make(chan *types.IndexedBlock, 0),
ConfirmedBlocksChan: make(chan *types.IndexedBlock),
UnconfirmedBlockCache: cache,
Synced: atomic.NewBool(false),
}
Expand Down
1 change: 1 addition & 0 deletions monitor/btcscanner/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ type Scanner interface {
Start()
GetCheckpointsChan() chan *types.CheckpointRecord
GetHeadersChan() chan *wire.BlockHeader
Stop()
}
12 changes: 9 additions & 3 deletions monitor/liveness_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,30 @@ import (
"github.com/babylonchain/vigilante/types"
)

func (m *Monitor) LivenessChecker() {
func (m *Monitor) runLivenessChecker() {
ticker := time.NewTicker(time.Duration(m.Cfg.LivenessCheckIntervalSeconds) * time.Second)

log.Infof("liveness checker is started, checking liveness every %d seconds", m.Cfg.LivenessCheckIntervalSeconds)

for {
for m.started.Load() {
select {
case <-m.quit:
m.wg.Done()
m.started.Store(false)
case <-ticker.C:
log.Debugf("next liveness check is in %d seconds", m.Cfg.LivenessCheckIntervalSeconds)
checkpoints := m.checkpointChecklist.GetAll()
for _, c := range checkpoints {
err := m.CheckLiveness(c)
if err != nil {
// TODO decide what to do with this error, sending an alarm?
panic(fmt.Errorf("the checkpoint %x at epoch %v is detected being censored: %w", c.ID(), c.EpochNum(), err))
log.Errorf("the checkpoint %x at epoch %v is detected being censored: %s", c.ID(), c.EpochNum(), err.Error())
}
}
}
}

log.Info("the liveness checker is stopped")
}

// CheckLiveness checks whether the Babylon node is under liveness attack with the following steps
Expand Down
51 changes: 42 additions & 9 deletions monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package monitor
import (
"fmt"
"github.com/btcsuite/btcd/wire"
"go.uber.org/atomic"
"sort"
"sync"

checkpointingtypes "github.com/babylonchain/babylon/x/checkpointing/types"
bbnclient "github.com/babylonchain/rpc-client/client"
Expand All @@ -28,6 +30,10 @@ type Monitor struct {

// tracks checkpoint records that have not been reported back to Babylon
checkpointChecklist *types.CheckpointsBookkeeper

wg sync.WaitGroup
started *atomic.Bool
quit chan struct{}
}

func New(cfg *config.MonitorConfig, genesisInfo *types.GenesisInfo, scanner btcscanner.Scanner, babylonClient bbnclient.BabylonClient) (*Monitor, error) {
Expand Down Expand Up @@ -55,20 +61,35 @@ func New(cfg *config.MonitorConfig, genesisInfo *types.GenesisInfo, scanner btcs
Cfg: cfg,
curEpoch: genesisEpoch,
checkpointChecklist: types.NewCheckpointsBookkeeper(),
quit: make(chan struct{}),
started: atomic.NewBool(false),
}, nil
}

// Start starts the verification core
func (m *Monitor) Start() {
go m.BTCScanner.Start()
if m.started.Load() {
log.Info("the Monitor is already started")
return
}

m.started.Store(true)
log.Info("the Monitor is started")

// starting BTC scanner
m.wg.Add(1)
go m.runBTCScanner()

if m.Cfg.LivenessChecker {
go m.LivenessChecker()
// starting liveness checker
m.wg.Add(1)
go m.runLivenessChecker()
}

log.Info("the Monitor is started")
for {
for m.started.Load() {
select {
case <-m.quit:
m.started.Store(false)
case header := <-m.BTCScanner.GetHeadersChan():
err := m.handleNewConfirmedHeader(header)
if err != nil {
Expand All @@ -80,12 +101,18 @@ func (m *Monitor) Start() {
if err != nil {
log.Errorf("failed to handler BTC raw checkpoint at epoch %d: %s", ckpt.EpochNum(), err.Error())
}

}
}

m.wg.Wait()
log.Info("the Monitor is stopped")
}

func (m *Monitor) runBTCScanner() {
m.BTCScanner.Start()
m.wg.Done()
}

// TODO add stalling check where the header chain is behind the BTC canonical chain by W heights
func (m *Monitor) handleNewConfirmedHeader(header *wire.BlockHeader) error {
return m.checkHeaderConsistency(header)
}
Expand All @@ -94,6 +121,11 @@ func (m *Monitor) handleNewConfirmedCheckpoint(ckpt *types.CheckpointRecord) err
err := m.verifyCheckpoint(ckpt.RawCheckpoint)
if err != nil {
if sdkerrors.IsOf(err, types.ErrInconsistentLastCommitHash) {
// also record conflicting checkpoints since we need to ensure that
// alarm will be sent if conflicting checkpoints are censored
if m.Cfg.LivenessChecker {
m.addCheckpointToCheckList(ckpt)
}
// stop verification if a valid BTC checkpoint on an inconsistent LastCommitHash is found
// this means the ledger is on a fork
return fmt.Errorf("verification failed at epoch %v: %w", m.GetCurrentEpoch(), err)
Expand Down Expand Up @@ -157,7 +189,7 @@ func (m *Monitor) checkHeaderConsistency(header *wire.BlockHeader) error {
}

func GetSortedValSet(valSet checkpointingtypes.ValidatorWithBlsKeySet) checkpointingtypes.ValidatorWithBlsKeySet {
sort.Slice(valSet, func(i, j int) bool {
sort.Slice(valSet.ValSet, func(i, j int) bool {
addri, err := sdk.ValAddressFromBech32(valSet.ValSet[i].ValidatorAddress)
if err != nil {
panic(fmt.Errorf("failed to parse validator address %v: %w", valSet.ValSet[i].ValidatorAddress, err))
Expand All @@ -173,7 +205,8 @@ func GetSortedValSet(valSet checkpointingtypes.ValidatorWithBlsKeySet) checkpoin
return valSet
}

// Stop signals all vigilante goroutines to shutdown.
// Stop signals all vigilante goroutines to shut down.
func (m *Monitor) Stop() {
panic("implement graceful stop")
close(m.quit)
m.BTCScanner.Stop()
}
4 changes: 0 additions & 4 deletions types/ckpt_bookkeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ func (cb *CheckpointsBookkeeper) Exists(id string) bool {
return exists
}

func (cb *CheckpointsBookkeeper) size() int {
return len(cb.checkpointRecords)
}

func (cb *CheckpointsBookkeeper) GetAll() []*CheckpointRecord {
cb.Lock()
defer cb.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions types/ckpt_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ func (c *CheckpointCache) PopEarliestCheckpoint() *Ckpt {
ckpt := c.Checkpoints[0]
c.Checkpoints = c.Checkpoints[1:]
return ckpt
} else {
return nil
}

return nil
}

func (c *CheckpointCache) NumSegments() int {
Expand Down
Loading

0 comments on commit 1676ce1

Please sign in to comment.