Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(da): RetrieveBatchesV2 method #937

Draft
wants to merge 45 commits into
base: kirill/interchain-da
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
99b89e1
feat(da): RetrieveBatchesV2 method
keruch Jul 3, 2024
9f6c2fb
feat(da): adjustments after manual testing
keruch Jul 4, 2024
7cb2843
feat(da): added interchain-da proto contracts (#932)
keruch Jul 4, 2024
b09c75a
fix(rpc): panic and publish health event only on create batch error (…
zale144 Jul 9, 2024
2a5992c
feat(blockmanager): removed namespace from blockmanager (#943)
mtsitrin Jul 14, 2024
6a33534
fix(blockManager): multiple accumulateddata trigger (#960)
mtsitrin Jul 18, 2024
41d6053
fix(manager): Add start height to Submit batch to SL log message (#964)
zale144 Jul 18, 2024
9fa4109
fix(prune): fix guard for sequencer (#966)
danwt Jul 19, 2024
cce7cd3
fix(doc): remove misleading comment on last submitted height field (#…
danwt Jul 19, 2024
9da82f1
fix(sync): adds missing error log (#965)
danwt Jul 19, 2024
a9cbe2d
chore(code standards): fix wrong godoc for struct field (#976)
yuhangcangqian Jul 19, 2024
0d3be11
fix(p2p): set gossipsub buffersize to avoid missed blocks (#975)
srene Jul 22, 2024
242acb7
fix(submission): fix counting and time (#969)
danwt Jul 22, 2024
e922dea
feat(da): add metric for consecutive failed da submissions (#986)
srene Jul 26, 2024
c5f8f07
feat(rpc): Add sync info metrics (#979)
zale144 Jul 28, 2024
d57d5b7
feat(da): add default retry value for celestia (#985)
srene Jul 28, 2024
9e3201d
chore: add Initial changelog (#990)
hoangdv2429 Jul 30, 2024
402dd00
fix(rpc): Fix status `CatchingUp` field updating (#971)
zale144 Jul 31, 2024
83078f5
tests(pruning): Add pruning unit-test (#996)
mtsitrin Aug 6, 2024
93905ee
hotfix(submit): early catch and log for empty batch (#997)
danwt Aug 7, 2024
0bbe5be
fix(submit loop): add more logging around skew calculation (#1000)
danwt Aug 8, 2024
4c9c2c3
feat(p2p): block sync protocol (#915)
srene Aug 8, 2024
01fc2d5
fix(local pub sub): fix must subscribe to handle context cancelled (#…
danwt Aug 8, 2024
d3b4311
fix(test): submit loop test uses no op logger and prints more diagnos…
danwt Aug 9, 2024
603c160
refactor(p2p): rename p2p block event (#1006)
srene Aug 12, 2024
f3e69a3
fix(submit loop): correctly load commit on startup (#1011)
danwt Aug 12, 2024
fab177a
fix(manager): full-node syncing fix (#1013)
srene Aug 12, 2024
d31bcf1
code-standards(submit loop): small refactor to submit loop to move ti…
danwt Aug 13, 2024
7ee8303
fix(manager): unsubmitted bytes for batch calculation fix (#1019)
srene Aug 13, 2024
fd3cf3c
feat(da): submitted batch size metric (#1020)
srene Aug 15, 2024
25c069c
fix(p2p): improve blocksync logs (#1030)
srene Aug 21, 2024
88ba1fe
feat: sequencer rotation (#992)
mtsitrin Aug 22, 2024
54e2fa5
feat: rollapp consensus params (#991)
srene Aug 24, 2024
9aff932
feat: submit timestamps in block descriptors (#1032)
spoo-bar Aug 24, 2024
fe0f3cb
fix(manager): applylocalblock change mutex (#1036)
srene Aug 27, 2024
6bf24d0
fix(rpc): fix websocket subscription panic when no closing error (#1046)
srene Aug 27, 2024
e9709f6
fix(manager): gossip any pending blocks not gossiped before (in case …
srene Aug 28, 2024
f745143
feat(da): upgrade for new celestia-openrpc version for celestia-node …
srene Aug 28, 2024
562f673
fix(p2p): update height in blocksync loop (#1035)
srene Aug 28, 2024
d348ff0
fix(da): celestia openrpc version update (#1056)
srene Aug 30, 2024
5de7192
fix(manager): use block params from consensus param (#1042)
srene Sep 6, 2024
2da988f
refactor(manager): rename `rollappConsensusParams` to `rollappParams…
srene Sep 6, 2024
6f910b0
temp
keruch Sep 13, 2024
d3d58ff
Merge branch 'main' into kirill/interchain-da-retrieve-batch
keruch Sep 13, 2024
de50e94
merge commit
keruch Sep 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ packages:
github.com/dymensionxyz/dymint/settlement/dymension:
interfaces:
CosmosClient:
github.com/dymensionxyz/dymension/v3/x/sequencer/types:
github.com/dymensionxyz/dymint/third_party/dymension/sequencer/types:
interfaces:
QueryClient:
github.com/dymensionxyz/dymension/v3/x/rollapp/types:
github.com/dymensionxyz/dymint/third_party/dymension/rollapp/types:
interfaces:
QueryClient:
github.com/tendermint/tendermint/abci/types:
Expand All @@ -32,5 +32,10 @@ packages:
github.com/dymensionxyz/dymint/da:
interfaces:
DataAvailabilityLayerClient:
github.com/dymensionxyz/dymint/p2p:
interfaces:
GetProposerI:




91 changes: 27 additions & 64 deletions CHANGELOG.md

Large diffs are not rendered by default.

130 changes: 74 additions & 56 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,88 +14,110 @@ import (
// As the entire process can't be atomic we need to make sure the following condition apply before
// - block height is the expected block height on the store (height + 1).
// - block height is the expected block height on the app (last block height + 1).
func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMetaData blockMetaData) error {
func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMetaData types.BlockMetaData) error {
var retainHeight int64

// TODO: add switch case to have defined behavior for each case.
// validate block height
if block.Header.Height != m.State.NextHeight() {
return types.ErrInvalidBlockHeight
}

m.logger.Debug("Applying block", "height", block.Header.Height, "source", blockMetaData.source)
types.SetLastAppliedBlockSource(blockMetaData.Source.String())

m.logger.Debug("Applying block", "height", block.Header.Height, "source", blockMetaData.Source.String())

// Check if the app's last block height is the same as the currently produced block height
isBlockAlreadyApplied, err := m.isHeightAlreadyApplied(block.Header.Height)
if err != nil {
return fmt.Errorf("check if block is already applied: %w", err)
}
// In case the following true, it means we crashed after the commit and before updating the store height.
// In that case we'll want to align the store with the app state and continue to the next block.
// In case the following true, it means we crashed after the app commit but before updating the state
// In that case we'll want to align the state with the app commit result, as if the block was applied.
if isBlockAlreadyApplied {
// In this case, where the app was committed, but the state wasn't updated
// it will update the state from appInfo, saved responses and validators.
err := m.UpdateStateFromApp()
if err != nil {
return fmt.Errorf("update state from app: %w", err)
}
m.logger.Debug("Aligned with app state required. Skipping to next block", "height", block.Header.Height)
return nil
}
// Start applying the block assuming no inconsistency was found.
_, err = m.Store.SaveBlock(block, commit, nil)
if err != nil {
return fmt.Errorf("save block: %w", err)
}
m.logger.Info("updated state from app commit", "height", block.Header.Height)
} else {
var appHash []byte
// Start applying the block assuming no inconsistency was found.
_, err = m.Store.SaveBlock(block, commit, nil)
if err != nil {
return fmt.Errorf("save block: %w", err)
}

responses, err := m.Executor.ExecuteBlock(m.State, block)
if err != nil {
return fmt.Errorf("execute block: %w", err)
}
err := m.saveP2PBlockToBlockSync(block, commit)
if err != nil {
m.logger.Error("save block blocksync", "err", err)
}

dbBatch := m.Store.NewBatch()
dbBatch, err = m.Store.SaveBlockResponses(block.Header.Height, responses, dbBatch)
if err != nil {
dbBatch.Discard()
return fmt.Errorf("save block responses: %w", err)
}
responses, err := m.Executor.ExecuteBlock(m.State, block)
if err != nil {
return fmt.Errorf("execute block: %w", err)
}

_, err = m.Store.SaveBlockResponses(block.Header.Height, responses, nil)
if err != nil {
return fmt.Errorf("save block responses: %w", err)
}

// Get the validator changes from the app
validators := m.State.NextValidators.Copy() // TODO: this will be changed when supporting multiple sequencers from the hub
// Commit block to app
appHash, retainHeight, err = m.Executor.Commit(m.State, block, responses)
if err != nil {
return fmt.Errorf("commit block: %w", err)
}

dbBatch, err = m.Store.SaveValidators(block.Header.Height, validators, dbBatch)
if err != nil {
dbBatch.Discard()
return fmt.Errorf("save validators: %w", err)
// Update the state with the new app hash, and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height)
}

err = dbBatch.Commit()
// check if the proposer needs to be changed
switchRole := m.Executor.UpdateProposerFromBlock(m.State, block)

// save sequencers to store to be queried over RPC
batch := m.Store.NewBatch()
batch, err = m.Store.SaveSequencers(block.Header.Height, &m.State.Sequencers, batch)
if err != nil {
return fmt.Errorf("commit batch to disk: %w", err)
return fmt.Errorf("save sequencers: %w", err)
}

// Commit block to app
appHash, retainHeight, err := m.Executor.Commit(m.State, block, responses)
batch, err = m.Store.SaveState(m.State, batch)
if err != nil {
return fmt.Errorf("commit block: %w", err)
return fmt.Errorf("update state: %w", err)
}

// If failed here, after the app committed, but before the state is updated, we'll update the state on
// UpdateStateFromApp using the saved responses and validators.

// Update the state with the new app hash, last validators and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height, validators)
_, err = m.Store.SaveState(m.State, nil)
err = batch.Commit()
if err != nil {
return fmt.Errorf("update state: %w", err)
return fmt.Errorf("commit state: %w", err)
}

types.RollappHeightGauge.Set(float64(block.Header.Height))

// Prune old heights, if requested by ABCI app.
if 0 < retainHeight {
err = m.pruneBlocks(uint64(retainHeight))
err = m.PruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
}
}

m.blockCache.Delete(block.Header.Height)

if switchRole {
// TODO: graceful role change (https://github.com/dymensionxyz/dymint/issues/1008)
m.logger.Info("Node changing to proposer role")
panic("sequencer is no longer the proposer")
}

// validate whether configuration params and rollapp consensus params keep in line, after rollapp params are updated from the responses received in the block execution
err = m.ValidateConfigWithRollappParams()
if err != nil {
return err
}

return nil
}

Expand All @@ -120,32 +142,28 @@ func (m *Manager) attemptApplyCachedBlocks() error {
for {
expectedHeight := m.State.NextHeight()

cachedBlock, blockExists := m.blockCache[expectedHeight]
cachedBlock, blockExists := m.blockCache.Get(expectedHeight)
if !blockExists {
break
}
if err := m.validateBlock(cachedBlock.Block, cachedBlock.Commit); err != nil {
delete(m.blockCache, cachedBlock.Block.Header.Height)
/// TODO: can we take an action here such as dropping the peer / reducing their reputation?
if err := m.validateBlockBeforeApply(cachedBlock.Block, cachedBlock.Commit); err != nil {
m.blockCache.Delete(cachedBlock.Block.Header.Height)
// TODO: can we take an action here such as dropping the peer / reducing their reputation?
return fmt.Errorf("block not valid at height %d, dropping it: err:%w", cachedBlock.Block.Header.Height, err)
}

err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, blockMetaData{source: gossipedBlock})
err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: cachedBlock.Source})
if err != nil {
return fmt.Errorf("apply cached block: expected height: %d: %w", expectedHeight, err)
}
m.logger.Info("Block applied", "height", expectedHeight)

delete(m.blockCache, cachedBlock.Block.Header.Height)
}

return nil
}

func (m *Manager) validateBlock(block *types.Block, commit *types.Commit) error {
// Currently we're assuming proposer is never nil as it's a pre-condition for
// dymint to start
proposer := m.SLClient.GetProposer()

return types.ValidateProposedTransition(m.State, block, commit, proposer)
// This function validates the block and commit against the state before applying it.
func (m *Manager) validateBlockBeforeApply(block *types.Block, commit *types.Commit) error {
return types.ValidateProposedTransition(m.State, block, commit, m.GetProposerPubKey())
}
34 changes: 34 additions & 0 deletions block/block_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package block

import (
"github.com/dymensionxyz/dymint/types"
)

type Cache struct {
// concurrency managed by Manager.retrieverMu mutex
cache map[uint64]types.CachedBlock
}

func (m *Cache) Add(h uint64, b *types.Block, c *types.Commit, source types.BlockSource) {
m.cache[h] = types.CachedBlock{Block: b, Commit: c, Source: source}
types.BlockCacheSizeGauge.Set(float64(m.Size()))
}

func (m *Cache) Delete(h uint64) {
delete(m.cache, h)
types.BlockCacheSizeGauge.Set(float64(m.Size()))
}

func (m *Cache) Get(h uint64) (types.CachedBlock, bool) {
ret, found := m.cache[h]
return ret, found
}

func (m *Cache) Has(h uint64) bool {
_, found := m.Get(h)
return found
}

func (m *Cache) Size() int {
return len(m.cache)
}
1 change: 0 additions & 1 deletion block/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,4 @@ import "errors"
var (
ErrNonRecoverable = errors.New("non recoverable")
ErrRecoverable = errors.New("recoverable")
ErrWrongDA = errors.New("wrong DA")
)
Loading
Loading