Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): block sync protocol #915

Merged
merged 109 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 107 commits
Commits
Show all changes
109 commits
Select commit Hold shift + click to select a range
a738547
wip
srene Jun 5, 2024
f5aa307
wip
srene Jun 7, 2024
901a171
disc
srene Jun 10, 2024
4a137d7
added persistent p2p
srene Jun 10, 2024
5216e61
status code
srene Jun 11, 2024
141be8b
status and sync height
srene Jun 11, 2024
62b73f9
remove duplicate func
srene Jun 11, 2024
a325424
get block
srene Jun 11, 2024
738c809
block adv refresh
srene Jun 12, 2024
7fdab35
refactoring
srene Jun 13, 2024
6278a24
missing blocks
srene Jun 13, 2024
2d00c26
sync block manager
srene Jun 13, 2024
f417cee
refactor da fetching
srene Jun 13, 2024
c4e55a0
skip gossip received height
srene Jun 13, 2024
d2385d9
cleaning
srene Jun 13, 2024
fd5b295
lint fix
srene Jun 13, 2024
d918237
minor edits + godoc
srene Jun 14, 2024
43755a3
lint fix
srene Jun 14, 2024
81c2877
validator + godoc
srene Jun 14, 2024
c68ad0f
godoc update
srene Jun 14, 2024
027f385
fixing tests
srene Jun 14, 2024
9f007d3
adv fix
srene Jun 16, 2024
d3896fe
cleaning
srene Jun 16, 2024
8e598a2
lint fix
srene Jun 16, 2024
d25813a
test
srene Jun 16, 2024
3bd3363
store id test
srene Jun 17, 2024
b6e2c72
comments
srene Jun 17, 2024
f397170
add block on reception
srene Jun 17, 2024
982e4da
set initial height
srene Jun 17, 2024
650a5c5
Update store/store.go
srene Jun 19, 2024
f1d49b9
event data check pubsub
srene Jun 19, 2024
2306c65
error handling adding block to blocksync
srene Jun 19, 2024
63999ba
adding mutex to setappliedheight
srene Jun 19, 2024
e7a8886
Update store/store.go
srene Jun 19, 2024
0e1e6cd
fixing store cid func name
srene Jun 19, 2024
3339556
improving error message
srene Jun 19, 2024
20ad6aa
Update config/p2p.go
srene Jun 19, 2024
3abb47b
lint fix
srene Jun 19, 2024
481c82f
restore celestia retrieve
srene Jun 19, 2024
0fe88f3
rebase fix
srene Jun 20, 2024
a2c614c
addressing pr comments
srene Jun 20, 2024
7425ee8
fixing tests
srene Jun 20, 2024
c54ea2c
lint fix
srene Jun 20, 2024
5303d24
fixing logs
srene Jun 25, 2024
3a3eeab
addressing comments
srene Jun 26, 2024
a630584
addressing comments
srene Jun 26, 2024
377df06
refactor StartBlockSync
srene Jun 26, 2024
973f6d9
max in latestSeenHeight
srene Jun 26, 2024
b9f6da9
blocksync loop desc
srene Jun 26, 2024
53922d1
lint fix
srene Jun 26, 2024
b5c550b
docstring
srene Jun 26, 2024
854dbac
dht validator fix + test
srene Jun 26, 2024
c946279
lint fix
srene Jun 26, 2024
0319ec9
addressing comments
srene Jun 28, 2024
7d2b376
unify sync from da
srene Jun 28, 2024
a99b0c8
lint fix
srene Jun 28, 2024
ce779ff
pruning blocks blocksync store
srene Jun 28, 2024
b30495a
fix after rebase
srene Jun 28, 2024
3984c41
minor edits
srene Jun 28, 2024
9e6390d
sync mutex
srene Jun 28, 2024
172f06b
advertise dht retry time conf + minor edit
srene Jun 28, 2024
f19f39f
lint fix
srene Jun 28, 2024
5dfb529
block cache mu
srene Jul 1, 2024
ec22327
minor edit
srene Jul 1, 2024
c401a18
validate + doscstring
srene Jul 1, 2024
c94c312
addressing comments
srene Jul 1, 2024
e75fd83
pruning cids
srene Jul 1, 2024
71796ae
added pruning cid test
srene Jul 1, 2024
8078622
replace adv cids loop by single execution
srene Jul 1, 2024
6515e0d
fix
srene Jul 2, 2024
71d0d80
comment
srene Jul 2, 2024
d7c4dc2
lint
srene Jul 2, 2024
4126f25
addressing comments
srene Jul 15, 2024
6afe9b6
addressing comments
srene Jul 16, 2024
87cc763
pruning blocks blocksync wrapped
srene Jul 16, 2024
311270f
lint fix
srene Jul 16, 2024
4e23781
removing namespace testutil
srene Jul 16, 2024
a712250
addressing comments
srene Jul 17, 2024
2a33969
Update block/p2p.go
srene Jul 19, 2024
4d5d90e
var rename
srene Jul 20, 2024
571d87f
add disable option
srene Jul 20, 2024
6c3b07d
adding comments / go doc + some renaming
srene Jul 20, 2024
dfa73c3
lint fix + not returning error in block-sync pruning to avoid error i…
srene Jul 20, 2024
629c424
rename
srene Jul 20, 2024
2a6a27d
blocks received map refactor
srene Jul 20, 2024
b63787a
Update p2p/block_sync.go
srene Jul 22, 2024
c2fe48a
Update p2p/client.go
srene Jul 22, 2024
15bdd43
Update p2p/block_sync.go
srene Jul 22, 2024
f05d9c1
Update p2p/block_sync.go
srene Jul 22, 2024
dc839c3
renaming
srene Jul 22, 2024
0856963
Update p2p/block_sync.go
srene Jul 22, 2024
5fa92a4
Update p2p/block_sync.go
srene Jul 22, 2024
337eb7e
Update p2p/block_sync.go
srene Jul 22, 2024
3d04db4
Update p2p/client.go
srene Jul 22, 2024
3263cfa
Update p2p/client.go
srene Jul 22, 2024
e1a8e68
lint fix
srene Jul 22, 2024
26a6603
param name update
srene Jul 22, 2024
161760b
dont use gossipsub fork
srene Jul 22, 2024
b01ddaf
event type received error handling
srene Jul 22, 2024
927ca67
solving issues after rebase
srene Aug 2, 2024
bff1394
added missing test + log removed
srene Aug 5, 2024
e6db914
fix block source tag
srene Aug 5, 2024
21109fd
Update p2p/client.go
srene Aug 6, 2024
25f6eff
minor fix
srene Aug 6, 2024
16fe90c
minor fix
srene Aug 6, 2024
529fb15
minor fix
srene Aug 6, 2024
7150f44
blocks received moved to struct
srene Aug 6, 2024
7d866ff
Danwt/patch srene p2p block sync protocol merge main resolve conflict…
danwt Aug 8, 2024
c9d3723
Merge branch 'main' into srene/p2p-block-sync-protocol
danwt Aug 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
if err != nil {
return fmt.Errorf("update state: %w", err)
}

// Prune old heights, if requested by ABCI app.
if 0 < retainHeight {
err = m.pruneBlocks(uint64(retainHeight))
Expand Down Expand Up @@ -132,7 +131,7 @@ func (m *Manager) attemptApplyCachedBlocks() error {
return fmt.Errorf("block not valid at height %d, dropping it: err:%w", cachedBlock.Block.Header.Height, err)
}

err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: types.GossipedBlock})
err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: cachedBlock.Source})
if err != nil {
return fmt.Errorf("apply cached block: expected height: %d: %w", expectedHeight, err)
}
Expand Down
4 changes: 2 additions & 2 deletions block/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ type Cache struct {
cache map[uint64]types.CachedBlock
}

func (m *Cache) AddBlockToCache(h uint64, b *types.Block, c *types.Commit) {
m.cache[h] = types.CachedBlock{Block: b, Commit: c}
func (m *Cache) AddBlockToCache(h uint64, b *types.Block, c *types.Commit, source types.BlockSource) {
m.cache[h] = types.CachedBlock{Block: b, Commit: c, Source: source}
types.BlockCacheSizeGauge.Set(float64(m.Size()))
}

Expand Down
81 changes: 39 additions & 42 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
"sync"
"sync/atomic"

"code.cloudfoundry.org/go-diodes"
"github.com/dymensionxyz/gerr-cosmos/gerrc"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -65,15 +64,15 @@
// Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks,
// and incoming DA blocks, respectively.
retrieverMu sync.Mutex
Retriever da.BatchRetriever
// get the next target height to sync local state to
targetSyncHeight diodes.Diode
// TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA
TargetHeight atomic.Uint64

// Protect against syncing twice from DA in case new batch is posted but it did not finish to sync yet.
danwt marked this conversation as resolved.
Show resolved Hide resolved
syncFromDaMu sync.Mutex
Retriever da.BatchRetriever
// Cached blocks and commits for applying at future heights. The blocks may not be valid, because
// we can only do full validation in sequential order.
blockCache *Cache

// TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA
TargetHeight atomic.Uint64
mtsitrin marked this conversation as resolved.
Show resolved Hide resolved
danwt marked this conversation as resolved.
Show resolved Hide resolved
}

// NewManager creates new block Manager.
Expand Down Expand Up @@ -101,18 +100,17 @@
}

m := &Manager{
Pubsub: pubsub,
p2pClient: p2pClient,
LocalKey: localKey,
Conf: conf,
Genesis: genesis,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
targetSyncHeight: diodes.NewOneToOne(1, nil),
logger: logger,
Pubsub: pubsub,
p2pClient: p2pClient,
LocalKey: localKey,
Conf: conf,
Genesis: genesis,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
logger: logger,
blockCache: &Cache{
cache: make(map[uint64]types.CachedBlock),
},
Expand Down Expand Up @@ -147,16 +145,6 @@
}
}

if !isSequencer {
// Fullnode loop can start before syncing from DA
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)
}

err = m.syncBlockManager()
if err != nil {
return fmt.Errorf("sync block manager: %w", err)
}

eg, ctx := errgroup.WithContext(ctx)

if isSequencer {
Expand All @@ -167,26 +155,33 @@
go func() {
bytesProducedC <- nBytes
}()
err = m.syncFromSettlement()
if err != nil {
return fmt.Errorf("sync block manager from settlement: %w", err)
}
eg.Go(func() error {
return m.SubmitLoop(ctx, bytesProducedC)
})
eg.Go(func() error {
return m.ProduceBlockLoop(ctx, bytesProducedC)
})

} else {
eg.Go(func() error {
return m.RetrieveLoop(ctx)
})
eg.Go(func() error {
return m.SyncToTargetHeightLoop(ctx)
})
}
// Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel.
mtsitrin marked this conversation as resolved.
Show resolved Hide resolved
go func() {
err := m.syncFromSettlement()
if err != nil {
m.logger.Error("sync block manager from settlement", "err", err)
}
// DA Sync. Subscribe to SL next batch events
go uevent.MustSubscribe(ctx, m.Pubsub, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger)
Dismissed Show dismissed Hide dismissed
}()

go func() {
err := eg.Wait()
m.logger.Info("Block manager err group finished.", "err", err)
}()
// P2P Sync. Subscribe to P2P received blocks events
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.onReceivedBlock, m.logger)
Dismissed Show dismissed Hide dismissed
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.onReceivedBlock, m.logger)
Dismissed Show dismissed Hide dismissed

}
mtsitrin marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand All @@ -208,21 +203,23 @@
return m.LastSubmittedHeight.Load() + 1
}

// syncBlockManager enforces the node to be synced on initial run.
func (m *Manager) syncBlockManager() error {
// syncFromSettlement enforces the node to be synced on initial run from SL and DA.
func (m *Manager) syncFromSettlement() error {
res, err := m.SLClient.GetLatestBatch()
if errors.Is(err, gerrc.ErrNotFound) {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL.")
m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1))
return nil
}

if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
return err
}
m.LastSubmittedHeight.Store(res.EndHeight)
err = m.syncToTargetHeight(res.EndHeight)
m.UpdateTargetHeight(res.EndHeight)
danwt marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
Expand Down
10 changes: 6 additions & 4 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/ipfs/go-datastore"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -55,10 +56,11 @@ func TestInitialState(t *testing.T) {
// Init p2p client
privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{
ListenAddress: config.DefaultListenAddress,
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
}, privKey, "TestChain", pubsubServer, logger)
ListenAddress: config.DefaultListenAddress,
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
BlockSyncRequestIntervalTime: 30 * time.Second,
}, privKey, "TestChain", emptyStore, pubsubServer, datastore.NewMapDatastore(), logger)
assert.NoError(err)
assert.NotNil(p2pClient)

Expand Down
40 changes: 32 additions & 8 deletions block/gossip.go → block/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,40 @@ import (
"context"
"fmt"

"github.com/tendermint/tendermint/libs/pubsub"

"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/types"
"github.com/tendermint/tendermint/libs/pubsub"
)

// onNewGossipedBlock will take a block and apply it
func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
eventData, _ := event.Data().(p2p.GossipedBlock)
// onReceivedBlock receives a block received event from P2P, saves the block to a cache and tries to apply the blocks from the cache.
func (m *Manager) onReceivedBlock(event pubsub.Message) {
eventData, ok := event.Data().(p2p.P2PBlockEvent)
if !ok {
m.logger.Error("onReceivedBlock", "err", "wrong event data received")
return
}
var source types.BlockSource

if len(event.Events()[p2p.EventTypeKey]) != 1 {
m.logger.Error("onReceivedBlock", "err", "wrong number of event types received with the event", "received", len(event.Events()[p2p.EventTypeKey]))
return
}

switch event.Events()[p2p.EventTypeKey][0] {
omritoptix marked this conversation as resolved.
Show resolved Hide resolved
case p2p.EventNewBlockSyncBlock:
source = types.BlockSync
case p2p.EventNewGossipedBlock:
source = types.Gossiped
default:
m.logger.Error("onReceivedBlock", "err", "wrong event type received", "type", event.Events()[p2p.EventTypeKey][0])
return
}

block := eventData.Block
commit := eventData.Commit
m.retrieverMu.Lock() // needed to protect blockCache access
height := block.Header.Height
m.retrieverMu.Lock() // needed to protect blockCache access

// It is not strictly necessary to return early, for correctness, but doing so helps us avoid mutex pressure and unnecessary repeated attempts to apply cached blocks
if m.blockCache.HasBlockInCache(height) {
m.retrieverMu.Unlock()
Expand All @@ -30,7 +51,7 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {

nextHeight := m.State.NextHeight()
if height >= nextHeight {
m.blockCache.AddBlockToCache(height, &block, &commit)
m.blockCache.AddBlockToCache(height, &block, &commit, source)
}
m.retrieverMu.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant

Expand All @@ -40,8 +61,10 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
}
}

// gossipBlock sends created blocks by the sequencer to full-nodes using P2P gossipSub
func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit types.Commit) error {
gossipedBlock := p2p.GossipedBlock{Block: block, Commit: commit}
m.logger.Info("Gossipping block", "height", block.Header.Height)
gossipedBlock := p2p.P2PBlockEvent{Block: block, Commit: commit}
gossipedBlockBytes, err := gossipedBlock.MarshalBinary()
omritoptix marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("marshal binary: %w: %w", err, ErrNonRecoverable)
Expand All @@ -51,5 +74,6 @@ func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit typ
// could cause that to fail, so we assume recoverable.
return fmt.Errorf("p2p gossip block: %w: %w", err, ErrRecoverable)
}

return nil
}
3 changes: 2 additions & 1 deletion block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
}

bytesProducedN := block.SizeBytes() + commit.SizeBytes()
m.logger.Info("New block.", "size", uint64(block.ToProto().Size()))
mtsitrin marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -181,7 +182,7 @@ func (m *Manager) produceBlock(allowEmpty bool) (*types.Block, *types.Commit, er
}
}

if err := m.applyBlock(block, commit, types.BlockMetaData{Source: types.ProducedBlock}); err != nil {
if err := m.applyBlock(block, commit, types.BlockMetaData{Source: types.Produced}); err != nil {
return nil, nil, fmt.Errorf("apply block: %w: %w", err, ErrNonRecoverable)
}

Expand Down
5 changes: 5 additions & 0 deletions block/pruning.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package block

import (
"context"
mtsitrin marked this conversation as resolved.
Show resolved Hide resolved
"fmt"

"github.com/dymensionxyz/gerr-cosmos/gerrc"
Expand All @@ -14,6 +15,10 @@ func (m *Manager) pruneBlocks(retainHeight uint64) error {
gerrc.ErrInvalidArgument)
}

err := m.p2pClient.RemoveBlocks(context.TODO(), m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning block-sync store", "retain_height", retainHeight, "err", err)
}
pruned, err := m.Store.PruneBlocks(m.State.BaseHeight, retainHeight)
if err != nil {
return fmt.Errorf("prune block store: %w", err)
Expand Down
41 changes: 19 additions & 22 deletions block/retriever.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,38 @@
package block

import (
"context"
"errors"
"fmt"

"code.cloudfoundry.org/go-diodes"
"github.com/dymensionxyz/gerr-cosmos/gerrc"

"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/types"
"github.com/tendermint/tendermint/libs/pubsub"
)

// RetrieveLoop listens for new target sync heights and then syncs the chain by
// fetching batches from the settlement layer and then fetching the actual blocks
// from the DA.
func (m *Manager) RetrieveLoop(ctx context.Context) (err error) {
m.logger.Info("Started retrieve loop.")
p := diodes.NewPoller(m.targetSyncHeight, diodes.WithPollingContext(ctx))

for {
targetHeight := p.Next() // We only care about the latest one
if targetHeight == nil {
return
}

if err = m.syncToTargetHeight(*(*uint64)(targetHeight)); err != nil {
err = fmt.Errorf("sync until target: %w", err)
return
}
// onNewStateUpdate will try to sync to new height, if not already synced
func (m *Manager) onNewStateUpdate(event pubsub.Message) {
eventData, ok := event.Data().(*settlement.EventDataNewBatchAccepted)
if !ok {
m.logger.Error("onReceivedBatch", "err", "wrong event data received")
return
}
h := eventData.EndHeight
m.UpdateTargetHeight(h)
err := m.syncToTargetHeight(h)
if err != nil {
m.logger.Error("sync until target", "err", err)
}
omritoptix marked this conversation as resolved.
Show resolved Hide resolved
}

// syncToTargetHeight syncs blocks until the target height is reached.
// It fetches the batches from the settlement, gets the DA height and gets
// the actual blocks from the DA.
func (m *Manager) syncToTargetHeight(targetHeight uint64) error {
defer m.syncFromDaMu.Unlock()
m.syncFromDaMu.Lock()
danwt marked this conversation as resolved.
Show resolved Hide resolved
for currH := m.State.NextHeight(); currH <= targetHeight; currH = m.State.NextHeight() {
// if we have the block locally, we don't need to fetch it from the DA
err := m.applyLocalBlock(currH)
Expand Down Expand Up @@ -100,7 +97,7 @@ func (m *Manager) applyLocalBlock(height uint64) error {
}

m.retrieverMu.Lock()
err = m.applyBlock(block, commit, types.BlockMetaData{Source: types.LocalDbBlock})
err = m.applyBlock(block, commit, types.BlockMetaData{Source: types.LocalDb})
if err != nil {
return fmt.Errorf("apply block from local store: height: %d: %w", height, err)
}
Expand All @@ -113,7 +110,6 @@ func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error {
m.logger.Debug("trying to retrieve batch from DA", "daHeight", daMetaData.Height)
batchResp := m.fetchBatch(daMetaData)
if batchResp.Code != da.StatusSuccess {
m.logger.Error("fetching batch from DA", batchResp.Message)
return batchResp.Error
}

Expand All @@ -132,7 +128,8 @@ func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error {
m.logger.Error("validate block from DA", "height", block.Header.Height, "err", err)
continue
}
err := m.applyBlock(block, batch.Commits[i], types.BlockMetaData{Source: types.DABlock, DAHeight: daMetaData.Height})

err := m.applyBlock(block, batch.Commits[i], types.BlockMetaData{Source: types.DA, DAHeight: daMetaData.Height})
if err != nil {
return fmt.Errorf("apply block: height: %d: %w", block.Header.Height, err)
}
Expand Down
Loading
Loading