Skip to content

Commit

Permalink
fix(local pub sub): fix must subscribe to handle context cancelled (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
danwt authored and omritoptix committed Aug 13, 2024
1 parent ee9c621 commit 1a495f3
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 21 deletions.
6 changes: 3 additions & 3 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ func (m *Manager) attemptApplyCachedBlocks() error {
for {
expectedHeight := m.State.NextHeight()

cachedBlock, blockExists := m.blockCache.GetBlockFromCache(expectedHeight)
cachedBlock, blockExists := m.blockCache.Get(expectedHeight)
if !blockExists {
break
}
if err := m.validateBlock(cachedBlock.Block, cachedBlock.Commit); err != nil {
m.blockCache.DeleteBlockFromCache(cachedBlock.Block.Header.Height)
m.blockCache.Delete(cachedBlock.Block.Header.Height)
// TODO: can we take an action here such as dropping the peer / reducing their reputation?
return fmt.Errorf("block not valid at height %d, dropping it: err:%w", cachedBlock.Block.Header.Height, err)
}
Expand All @@ -137,7 +137,7 @@ func (m *Manager) attemptApplyCachedBlocks() error {
}
m.logger.Info("Block applied", "height", expectedHeight)

m.blockCache.DeleteBlockFromCache(cachedBlock.Block.Header.Height)
m.blockCache.Delete(cachedBlock.Block.Header.Height)
}

return nil
Expand Down
10 changes: 5 additions & 5 deletions block/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ type Cache struct {
cache map[uint64]types.CachedBlock
}

func (m *Cache) AddBlockToCache(h uint64, b *types.Block, c *types.Commit, source types.BlockSource) {
func (m *Cache) Add(h uint64, b *types.Block, c *types.Commit, source types.BlockSource) {
m.cache[h] = types.CachedBlock{Block: b, Commit: c, Source: source}
types.BlockCacheSizeGauge.Set(float64(m.Size()))
}

func (m *Cache) DeleteBlockFromCache(h uint64) {
func (m *Cache) Delete(h uint64) {
delete(m.cache, h)
types.BlockCacheSizeGauge.Set(float64(m.Size()))
}

func (m *Cache) GetBlockFromCache(h uint64) (types.CachedBlock, bool) {
func (m *Cache) Get(h uint64) (types.CachedBlock, bool) {
ret, found := m.cache[h]
return ret, found
}

func (m *Cache) HasBlockInCache(h uint64) bool {
_, found := m.GetBlockFromCache(h)
func (m *Cache) Has(h uint64) bool {
_, found := m.Get(h)
return found
}

Expand Down
4 changes: 2 additions & 2 deletions block/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (m *Manager) onReceivedBlock(event pubsub.Message) {
m.retrieverMu.Lock() // needed to protect blockCache access

// It is not strictly necessary to return early, for correctness, but doing so helps us avoid mutex pressure and unnecessary repeated attempts to apply cached blocks
if m.blockCache.HasBlockInCache(height) {
if m.blockCache.Has(height) {
m.retrieverMu.Unlock()
return
}
Expand All @@ -51,7 +51,7 @@ func (m *Manager) onReceivedBlock(event pubsub.Message) {

nextHeight := m.State.NextHeight()
if height >= nextHeight {
m.blockCache.AddBlockToCache(height, &block, &commit, source)
m.blockCache.Add(height, &block, &commit, source)
}
m.retrieverMu.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant

Expand Down
2 changes: 1 addition & 1 deletion block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error {

lastAppliedHeight = float64(block.Header.Height)

m.blockCache.DeleteBlockFromCache(block.Header.Height)
m.blockCache.Delete(block.Header.Height)
}
}
types.LastReceivedDAHeightGauge.Set(lastAppliedHeight)
Expand Down
14 changes: 7 additions & 7 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,14 @@ func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeight
Commits: make([]*types.Commit, 0, batchSize),
}

for height := startHeight; height <= endHeightInclusive; height++ {
block, err := m.Store.LoadBlock(height)
for h := startHeight; h <= endHeightInclusive; h++ {
block, err := m.Store.LoadBlock(h)
if err != nil {
return nil, fmt.Errorf("load block: height: %d: %w", height, err)
return nil, fmt.Errorf("load block: h: %d: %w", h, err)
}
commit, err := m.Store.LoadCommit(height)
commit, err := m.Store.LoadCommit(h)
if err != nil {
return nil, fmt.Errorf("load commit: height: %d: %w", height, err)
return nil, fmt.Errorf("load commit: h: %d: %w", h, err)
}

batch.Blocks = append(batch.Blocks, block)
Expand All @@ -196,8 +196,8 @@ func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeight
batch.Blocks = batch.Blocks[:len(batch.Blocks)-1]
batch.Commits = batch.Commits[:len(batch.Commits)-1]

if height == startHeight {
return nil, fmt.Errorf("block size exceeds max batch size: height %d: size: %d: %w", height, totalSize, gerrc.ErrOutOfRange)
if h == startHeight {
return nil, fmt.Errorf("block size exceeds max batch size: h %d: size: %d: %w", h, totalSize, gerrc.ErrOutOfRange)
}
break
}
Expand Down
10 changes: 7 additions & 3 deletions utils/event/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ func MustSubscribe(
logger types.Logger,
) {
subscription, err := pubsubServer.SubscribeUnbuffered(ctx, clientID, eventQuery)
if err != nil && !errors.Is(err, context.Canceled) {
logger.Error("subscribe to events")
panic(err)
if err != nil {
err = fmt.Errorf("subscribe unbuffered: %w", err)
if !errors.Is(err, context.Canceled) {
logger.Error("Must subscribe.", "err", err)
panic(err)
}
return
}

for {
Expand Down

0 comments on commit 1a495f3

Please sign in to comment.