Skip to content

Commit

Permalink
fix: Monitor/Fix deadlock bugs in bootstrapping (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
gitferry authored Feb 1, 2023
1 parent 1676ce1 commit fc7a384
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 37 deletions.
12 changes: 10 additions & 2 deletions btcclient/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,18 @@ func (c *Client) getChainBlocks(baseHeight uint64, tipBlock *types.IndexedBlock)
return nil, fmt.Errorf("the tip block height %v is less than the base height %v", tipHeight, baseHeight)
}

chainBlocks := make([]*types.IndexedBlock, tipHeight-baseHeight)
// the returned blocks include the block at the base height and the tip block
chainBlocks := make([]*types.IndexedBlock, tipHeight-baseHeight+1)
chainBlocks[len(chainBlocks)-1] = tipBlock

if tipHeight == baseHeight {
return chainBlocks, nil
}

prevHash := &tipBlock.Header.PrevBlock
for i := int(tipHeight-baseHeight) - 1; i >= 0; i-- {
// minus 2 is because the tip block is already put in the last position of the slice,
// and it is ensured that the length of chainBlocks is more than 1
for i := len(chainBlocks) - 2; i >= 0; i-- {
ib, mb, err := c.GetBlockByHash(prevHash)
if err != nil {
return nil, fmt.Errorf("failed to get block by hash %x: %w", prevHash, err)
Expand Down
4 changes: 0 additions & 4 deletions cmd/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ func cmdFunc(cmd *cobra.Command, args []string) {
panic(fmt.Errorf("failed to open Babylon client: %w", err))
}

btccParams := babylonClient.MustQueryBTCCheckpointParams()
k := btccParams.BtcConfirmationDepth

genesisInfo, err := types.GetGenesisInfoFromFile(genesisFile)
if err != nil {
panic(fmt.Errorf("failed to read genesis file: %w", err))
Expand All @@ -86,7 +83,6 @@ func cmdFunc(cmd *cobra.Command, args []string) {
&cfg.Monitor,
btcClient,
genesisInfo.GetBaseBTCHeight(),
k,
babylonClient.GetTagIdx(),
)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions config/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const (
defaultCheckpointBufferSize = 100
defaultBtcBlockBufferSize = 100
defaultBtcCacheSize = 100
defaultBtcConfirmationDepth = 6
defaultLivenessCheckIntervalSeconds = 10
defaultMaxLiveBtcHeights = 100
)
Expand All @@ -24,6 +25,8 @@ type MonitorConfig struct {
LivenessCheckIntervalSeconds uint64 `mapstructure:"liveness-check-interval-seconds"`
// Max lasting BTC heights that a checkpoint is not reported before an alarm is sent
MaxLiveBtcHeights uint64 `mapstructure:"max-live-btc-heights"`
// the confirmation depth to consider a BTC block as confirmed
BtcConfirmationDepth uint64 `mapstructure:"btc-confirmation-depth"`
// whether to enable liveness checker
LivenessChecker bool `mapstructure:"liveness-checker"`
}
Expand All @@ -35,6 +38,9 @@ func (cfg *MonitorConfig) Validate() error {
if cfg.BtcCacheSize < defaultBtcCacheSize {
return fmt.Errorf("btc-cache-size should not be less than %v", defaultCheckpointBufferSize)
}
if cfg.BtcConfirmationDepth < defaultBtcConfirmationDepth {
return fmt.Errorf("btc-confirmation-depth should not be less than %d", defaultBtcConfirmationDepth)
}
return nil
}

Expand All @@ -44,6 +50,7 @@ func DefaultMonitorConfig() MonitorConfig {
BtcBlockBufferSize: defaultBtcBlockBufferSize,
BtcCacheSize: defaultBtcCacheSize,
LivenessCheckIntervalSeconds: defaultLivenessCheckIntervalSeconds,
BtcConfirmationDepth: defaultBtcConfirmationDepth,
MaxLiveBtcHeights: defaultMaxLiveBtcHeights,
LivenessChecker: true,
}
Expand Down
12 changes: 7 additions & 5 deletions monitor/btcscanner/block_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@ func (bs *BtcScanner) blockEventHandler() {
if event.EventType == types.BlockConnected {
err := bs.handleConnectedBlocks(event)
if err != nil {
log.Warnf("failed to handle a connected block at height %d: %s,"+
log.Warnf("failed to handle a connected block at height %d: %s, "+
"need to restart the bootstrapping process", event.Height, err.Error())
bs.Synced.Store(false)
bs.Bootstrap()
if bs.Synced.Swap(false) {
bs.Bootstrap()
}
}
} else if event.EventType == types.BlockDisconnected {
err := bs.handleDisconnectedBlocks(event)
if err != nil {
log.Warnf("failed to handle a disconnected block at height %d: %s,"+
"need to restart the bootstrapping process", event.Height, err.Error())
bs.Synced.Store(false)
bs.Bootstrap()
if bs.Synced.Swap(false) {
bs.Bootstrap()
}
}
}
}
Expand Down
13 changes: 9 additions & 4 deletions monitor/btcscanner/btc_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type BtcScanner struct {
quit chan struct{}
}

func New(btcCfg *config.BTCConfig, monitorCfg *config.MonitorConfig, btcClient btcclient.BTCClient, btclightclientBaseHeight uint64, btcConfirmationDepth uint64, tagID uint8) (*BtcScanner, error) {
func New(btcCfg *config.BTCConfig, monitorCfg *config.MonitorConfig, btcClient btcclient.BTCClient, btclightclientBaseHeight uint64, tagID uint8) (*BtcScanner, error) {
bbnParam := netparams.GetBabylonParams(btcCfg.NetParams, tagID)
headersChan := make(chan *wire.BlockHeader, monitorCfg.BtcBlockBufferSize)
confirmedBlocksChan := make(chan *types.IndexedBlock, monitorCfg.BtcBlockBufferSize)
Expand All @@ -55,7 +55,7 @@ func New(btcCfg *config.BTCConfig, monitorCfg *config.MonitorConfig, btcClient b
return &BtcScanner{
BtcClient: btcClient,
BaseHeight: btclightclientBaseHeight,
K: btcConfirmationDepth,
K: monitorCfg.BtcConfirmationDepth,
ckptCache: ckptCache,
UnconfirmedBlockCache: unconfirmedBlockCache,
ConfirmedBlocksChan: confirmedBlocksChan,
Expand Down Expand Up @@ -91,6 +91,7 @@ func (bs *BtcScanner) Start() {
case <-bs.quit:
bs.Started.Store(false)
case block := <-bs.ConfirmedBlocksChan:
log.Debugf("found a confirmed BTC block at height %d", block.Height)
// send the header to the Monitor for consistency check
bs.blockHeaderChan <- block.Header
ckptBtc := bs.tryToExtractCheckpoint(block)
Expand All @@ -99,7 +100,7 @@ func (bs *BtcScanner) Start() {
// move to the next BTC block
continue
}
log.Infof("got a checkpoint at BTC block %v", block.Height)
log.Infof("found a checkpoint at BTC block %d", ckptBtc.FirstSeenBtcHeight)

bs.checkpointsChan <- ckptBtc
}
Expand Down Expand Up @@ -137,13 +138,16 @@ func (bs *BtcScanner) Bootstrap() {
panic(fmt.Errorf("failed to find the tail chain with base height %d: %w", bs.BaseHeight, err))
}

// replace all the unconfirmed blocks in the cache with new blocks to avoid forks
bs.UnconfirmedBlockCache.RemoveAll()
err = bs.UnconfirmedBlockCache.Init(chainBlocks)
if err != nil {
panic(fmt.Errorf("failed to initialize BTC cache for tail blocks: %w", err))
}

confirmedBlocks = bs.UnconfirmedBlockCache.TrimConfirmedBlocks(int(bs.K))
if confirmedBlocks == nil {
log.Debug("bootstrapping is finished but no confirmed blocks are found")
return
}

Expand All @@ -156,6 +160,7 @@ func (bs *BtcScanner) Bootstrap() {
}

bs.sendConfirmedBlocksToChan(confirmedBlocks)
log.Infof("bootstrapping is finished at the tip confirmed height: %d and tip unconfirmed height: %d", bs.confirmedTipBlock.Height, chainBlocks[len(chainBlocks)-1].Height)
}

func (bs *BtcScanner) GetHeadersChan() chan *wire.BlockHeader {
Expand All @@ -165,8 +170,8 @@ func (bs *BtcScanner) GetHeadersChan() chan *wire.BlockHeader {
func (bs *BtcScanner) sendConfirmedBlocksToChan(blocks []*types.IndexedBlock) {
for i := 0; i < len(blocks); i++ {
bs.ConfirmedBlocksChan <- blocks[i]
bs.confirmedTipBlock = blocks[i]
}
bs.confirmedTipBlock = blocks[len(blocks)-1]
}

func (bs *BtcScanner) tryToExtractCheckpoint(block *types.IndexedBlock) *types.CheckpointRecord {
Expand Down
17 changes: 9 additions & 8 deletions monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,18 @@ func (m *Monitor) Start() {
for m.started.Load() {
select {
case <-m.quit:
log.Info("the monitor is stopping")
m.started.Store(false)
case header := <-m.BTCScanner.GetHeadersChan():
err := m.handleNewConfirmedHeader(header)
if err != nil {
log.Errorf("failed to handle BTC header %x: %s", header.BlockHash(), err.Error())
log.Errorf("failed to handle BTC header: %s", err.Error())
break
}
case ckpt := <-m.BTCScanner.GetCheckpointsChan():
err := m.handleNewConfirmedCheckpoint(ckpt)
if err != nil {
log.Errorf("failed to handler BTC raw checkpoint at epoch %d: %s", ckpt.EpochNum(), err.Error())
log.Errorf("failed to handle BTC raw checkpoint at epoch %d: %s", ckpt.EpochNum(), err.Error())
}
}
}
Expand All @@ -118,6 +119,7 @@ func (m *Monitor) handleNewConfirmedHeader(header *wire.BlockHeader) error {
}

func (m *Monitor) handleNewConfirmedCheckpoint(ckpt *types.CheckpointRecord) error {

err := m.verifyCheckpoint(ckpt.RawCheckpoint)
if err != nil {
if sdkerrors.IsOf(err, types.ErrInconsistentLastCommitHash) {
Expand All @@ -140,10 +142,11 @@ func (m *Monitor) handleNewConfirmedCheckpoint(ckpt *types.CheckpointRecord) err
}

log.Infof("checkpoint at epoch %v has passed the verification", m.GetCurrentEpoch())

nextEpochNum := m.GetCurrentEpoch() + 1
err = m.updateEpochInfo(nextEpochNum)
if err != nil {
return fmt.Errorf("cannot get information at epoch %v: %w", nextEpochNum, err)
return fmt.Errorf("failed to update information of epoch %d: %w", nextEpochNum, err)
}

return nil
Expand Down Expand Up @@ -175,14 +178,12 @@ func (m *Monitor) updateEpochInfo(epoch uint64) error {
func (m *Monitor) checkHeaderConsistency(header *wire.BlockHeader) error {
btcHeaderHash := header.BlockHash()

log.Debugf("header for consistency check, hash %x", btcHeaderHash)

consistent, err := m.BBNQuerier.ContainsBTCHeader(&btcHeaderHash)
contains, err := m.BBNQuerier.ContainsBTCHeader(&btcHeaderHash)
if err != nil {
return err
}
if !consistent {
return fmt.Errorf("BTC header %x does not exists on Babylon BTC light client", btcHeaderHash)
if !contains {
return fmt.Errorf("BTC header %x does not exist on Babylon BTC light client", btcHeaderHash)
}

return nil
Expand Down
5 changes: 4 additions & 1 deletion monitor/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ func (q *Querier) QueryInfoForNextEpoch(epoch uint64) (*types.EpochInfo, error)
}
valSet[i] = val
}
// TODO should not query checkpoint here;
// instead, it should be queried when verification
// if the checkpoint is not confirmed, buffer it
// query checkpoint
ckpt, err := q.babylonCli.QueryRawCheckpoint(epoch)
if err != nil {
return nil, fmt.Errorf("failed to query raw checkpoint fro epoch %v: %w", epoch, err)
return nil, fmt.Errorf("failed to query raw checkpoint of epoch %v: %w", epoch, err)
}
if ckpt.Ckpt.EpochNum != epoch {
return nil, fmt.Errorf("the checkpoint is not at the desired epoch number, wanted: %v, got: %v", epoch, ckpt.Ckpt.EpochNum)
Expand Down
7 changes: 4 additions & 3 deletions sample-vigilante-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ submitter:
reporter:
netparams: simnet
monitor:
checkpoint-buffer-size: 100
btc-block-buffer-size: 100
btc-cache-size: 100
checkpoint-buffer-size: 1000
btc-block-buffer-size: 1000
btc-cache-size: 1000
btc-confirmation-depth: 6
liveness-check-interval-seconds: 100
max-live-btc-heights: 100
liveness-checker: true
7 changes: 4 additions & 3 deletions sample-vigilante.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ submitter:
reporter:
netparams: simnet
monitor:
checkpoint-buffer-size: 100
btc-block-buffer-size: 100
btc-cache-size: 100
checkpoint-buffer-size: 1000
btc-block-buffer-size: 1000
btc-cache-size: 1000
btc-confirmation-depth: 6
liveness-check-interval-seconds: 100
max-live-btc-heights: 100
liveness-checker: true
12 changes: 10 additions & 2 deletions types/btccache.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ func (b *BTCCache) RemoveLast() error {
return nil
}

// RemoveAll deletes all the blocks in cache
func (b *BTCCache) RemoveAll() {
b.Lock()
defer b.Unlock()

b.blocks = b.blocks[:0]
}

// Size returns the size of the cache. Thread-safe.
func (b *BTCCache) Size() uint64 {
b.RLock()
Expand Down Expand Up @@ -135,8 +143,8 @@ func (b *BTCCache) GetAllBlocks() []*IndexedBlock {
// TrimConfirmedBlocks keeps the last <=k blocks in the cache and returns the rest in the same order
// the returned blocks are considered confirmed
func (b *BTCCache) TrimConfirmedBlocks(k int) []*IndexedBlock {
b.RLock()
defer b.RLock()
b.Lock()
defer b.Unlock()

l := len(b.blocks)
if l <= k {
Expand Down
7 changes: 2 additions & 5 deletions types/ckpt_bookkeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (cb *CheckpointsBookkeeper) Add(cr *CheckpointRecord) {

id := cr.ID()

if !cb.Exists(id) {
if !cb.has(id) {
cb.checkpointRecords[id] = cr
} else {
// replace with the older one if the checkpoint id exists
Expand All @@ -43,10 +43,7 @@ func (cb *CheckpointsBookkeeper) Remove(id string) {
delete(cb.checkpointRecords, id)
}

func (cb *CheckpointsBookkeeper) Exists(id string) bool {
cb.Lock()
defer cb.Unlock()

func (cb *CheckpointsBookkeeper) has(id string) bool {
_, exists := cb.checkpointRecords[id]
return exists
}
Expand Down

0 comments on commit fc7a384

Please sign in to comment.