Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(da): RetrieveBatchesV2 method #937

Draft
wants to merge 45 commits into
base: kirill/interchain-da
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
99b89e1
feat(da): RetrieveBatchesV2 method
keruch Jul 3, 2024
9f6c2fb
feat(da): adjustments after manual testing
keruch Jul 4, 2024
7cb2843
feat(da): added interchain-da proto contracts (#932)
keruch Jul 4, 2024
b09c75a
fix(rpc): panic and publish health event only on create batch error (…
zale144 Jul 9, 2024
2a5992c
feat(blockmanager): removed namespace from blockmanager (#943)
mtsitrin Jul 14, 2024
6a33534
fix(blockManager): multiple accumulateddata trigger (#960)
mtsitrin Jul 18, 2024
41d6053
fix(manager): Add start height to Submit batch to SL log message (#964)
zale144 Jul 18, 2024
9fa4109
fix(prune): fix guard for sequencer (#966)
danwt Jul 19, 2024
cce7cd3
fix(doc): remove misleading comment on last submitted height field (#…
danwt Jul 19, 2024
9da82f1
fix(sync): adds missing error log (#965)
danwt Jul 19, 2024
a9cbe2d
chore(code standards): fix wrong godoc for struct field (#976)
yuhangcangqian Jul 19, 2024
0d3be11
fix(p2p): set gossipsub buffersize to avoid missed blocks (#975)
srene Jul 22, 2024
242acb7
fix(submission): fix counting and time (#969)
danwt Jul 22, 2024
e922dea
feat(da): add metric for consecutive failed da submissions (#986)
srene Jul 26, 2024
c5f8f07
feat(rpc): Add sync info metrics (#979)
zale144 Jul 28, 2024
d57d5b7
feat(da): add default retry value for celestia (#985)
srene Jul 28, 2024
9e3201d
chore: add Initial changelog (#990)
hoangdv2429 Jul 30, 2024
402dd00
fix(rpc): Fix status `CatchingUp` field updating (#971)
zale144 Jul 31, 2024
83078f5
tests(pruning): Add pruning unit-test (#996)
mtsitrin Aug 6, 2024
93905ee
hotfix(submit): early catch and log for empty batch (#997)
danwt Aug 7, 2024
0bbe5be
fix(submit loop): add more logging around skew calculation (#1000)
danwt Aug 8, 2024
4c9c2c3
feat(p2p): block sync protocol (#915)
srene Aug 8, 2024
01fc2d5
fix(local pub sub): fix must subscribe to handle context cancelled (#…
danwt Aug 8, 2024
d3b4311
fix(test): submit loop test uses no op logger and prints more diagnos…
danwt Aug 9, 2024
603c160
refactor(p2p): rename p2p block event (#1006)
srene Aug 12, 2024
f3e69a3
fix(submit loop): correctly load commit on startup (#1011)
danwt Aug 12, 2024
fab177a
fix(manager): full-node syncing fix (#1013)
srene Aug 12, 2024
d31bcf1
code-standards(submit loop): small refactor to submit loop to move ti…
danwt Aug 13, 2024
7ee8303
fix(manager): unsubmitted bytes for batch calculation fix (#1019)
srene Aug 13, 2024
fd3cf3c
feat(da): submitted batch size metric (#1020)
srene Aug 15, 2024
25c069c
fix(p2p): improve blocksync logs (#1030)
srene Aug 21, 2024
88ba1fe
feat: sequencer rotation (#992)
mtsitrin Aug 22, 2024
54e2fa5
feat: rollapp consensus params (#991)
srene Aug 24, 2024
9aff932
feat: submit timestamps in block descriptors (#1032)
spoo-bar Aug 24, 2024
fe0f3cb
fix(manager): applylocalblock change mutex (#1036)
srene Aug 27, 2024
6bf24d0
fix(rpc): fix websocket subscription panic when no closing error (#1046)
srene Aug 27, 2024
e9709f6
fix(manager): gossip any pending blocks not gossiped before (in case …
srene Aug 28, 2024
f745143
feat(da): upgrade for new celestia-openrpc version for celestia-node …
srene Aug 28, 2024
562f673
fix(p2p): update height in blocksync loop (#1035)
srene Aug 28, 2024
d348ff0
fix(da): celestia openrpc version update (#1056)
srene Aug 30, 2024
5de7192
fix(manager): use block params from consensus param (#1042)
srene Sep 6, 2024
2da988f
refactor(manager): rename `rollappConsensusParams` to `rollappParams…
srene Sep 6, 2024
6f910b0
temp
keruch Sep 13, 2024
d3d58ff
Merge branch 'main' into kirill/interchain-da-retrieve-batch
keruch Sep 13, 2024
de50e94
merge commit
keruch Sep 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions da/da.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,8 @@ type ResultRetrieveBatch struct {
// ResultRetrieveBatchV2 contains a batch of blocks returned from the DA layer client.
type ResultRetrieveBatchV2 struct {
BaseResult
// Batches is the full block retrieved from the DA layer.
// If BaseResult.Code is not StatusSuccess, this field is nil.
Batches []*types.Batch
// Batch is the full block retrieved from the DA layer.
Batch types.Batch
}

// Path TODO: move to the Dymension proto file
Expand Down Expand Up @@ -260,7 +259,7 @@ type ClientV2 interface {
Stop() error

// SubmitBatchV2 submits the passed in block to the DA layer.
SubmitBatchV2(*types.Batch) ResultSubmitBatchV2
SubmitBatchV2(types.Batch) ResultSubmitBatchV2

GetClientType() Client

Expand Down
10 changes: 10 additions & 0 deletions da/interchain/chain_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ func (c *daClient) Params(ctx context.Context) (interchainda.Params, error) {
return resp.GetParams(), nil
}

func (c *daClient) Blob(ctx context.Context, id interchainda.BlobID) (*interchainda.QueryBlobResponse, error) {
resp, err := c.queryClient.Blob(ctx, &interchainda.QueryBlobRequest{
BlobId: uint64(id),
})
if err != nil {
return nil, fmt.Errorf("can't query DA layer params: %w", err)
}
return resp, nil
}

func (c *daClient) GetTx(ctx context.Context, txHash string) (*tx.GetTxResponse, error) {
return c.txService.GetTx(ctx, &tx.GetTxRequest{Hash: txHash})
}
Expand Down
50 changes: 50 additions & 0 deletions da/interchain/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package interchain

import (
"errors"
"fmt"

"github.com/dymensionxyz/dymint/da/interchain/ioutils"
"github.com/dymensionxyz/dymint/types"
)

// EncodeBatch encodes the batch to the interchain DA layer format.
// The batch is represented in binary and gzipped.
func EncodeBatch(b types.Batch) ([]byte, error) {
// Prepare the blob data
blob, err := b.MarshalBinary()
if err != nil {
return nil, fmt.Errorf("can't marshal batch: %w", err)
}

// Gzip the blob
gzipped, err := ioutils.Gzip(blob)
if err != nil {
return nil, fmt.Errorf("can't gzip batch: %w", err)
}

return gzipped, nil
}

// DecodeBatch decodes the batch from the interchain DA layer format.
// The incoming batch must be a gzipped binary.
func DecodeBatch(b []byte) (types.Batch, error) {
if !ioutils.IsGzip(b) {
return types.Batch{}, errors.New("batch is not gzip-compressed")
}

// Gunzip the blob
binary, err := ioutils.Gunzip(b)
if err != nil {
return types.Batch{}, fmt.Errorf("can't gunzip batch: %w", err)
}

// Prepare the blob data
var batch types.Batch
err = batch.UnmarshalBinary(binary)
if err != nil {
return types.Batch{}, fmt.Errorf("can't unmarshal batch: %w", err)
}
Comment on lines +38 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont need 'can't in the messages


return batch, nil
}
69 changes: 69 additions & 0 deletions da/interchain/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package interchain_test

import (
"reflect"
"testing"

"github.com/stretchr/testify/require"

"github.com/dymensionxyz/dymint/da/interchain"
"github.com/dymensionxyz/dymint/types"
)

func FuzzEncodeDecodeBatch1(f *testing.F) {
f.Add(uint64(0), uint64(0))

f.Fuzz(func(t *testing.T, h1, h2 uint64) {
// Generate batches with random headers
expected := types.Batch{
StartHeight: h1,
EndHeight: h2,
Blocks: []*types.Block{},
Commits: []*types.Commit{},
}

data, err := interchain.EncodeBatch(expected)
require.NoError(t, err)

actual, err := interchain.DecodeBatch(data)
require.NoError(t, err)

require.Equal(t, expected, actual)
})
}
Comment on lines +17 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what exactly do you expect to reveal by only fuzzing the heights?


func TestEncodeDecodeBatch(t *testing.T) {
expected := types.Batch{
StartHeight: 1,
EndHeight: 123,
Blocks: []*types.Block{
{
Header: types.Header{
Height: 1,
},
},
{
Header: types.Header{
Height: 2,
},
},
},
Commits: []*types.Commit{
{
Height: 1,
},
{
Height: 2,
},
},
}

data, err := interchain.EncodeBatch(expected)
require.NoError(t, err)

actual, err := interchain.DecodeBatch(data)
require.NoError(t, err)

require.True(t, reflect.DeepEqual(expected, actual))
require.Equal(t, expected, actual)
}
1 change: 1 addition & 0 deletions da/interchain/interchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type DAClient interface {
Context() sdkclient.Context
BroadcastTx(string, ...sdk.Msg) (cosmosclient.Response, error)
Params(context.Context) (interchainda.Params, error)
Blob(ctx context.Context, id interchainda.BlobID) (*interchainda.QueryBlobResponse, error)
GetTx(context.Context, string) (*tx.GetTxResponse, error)
ABCIQueryWithProof(ctx context.Context, path string, data bytes.HexBytes, height int64) (*ctypes.ResultABCIQuery, error)
}
Expand Down
16 changes: 11 additions & 5 deletions da/interchain/interchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// TODO: add interchain DA chain mock
func TestDALayerClient_Init(t *testing.T) {
func TestDALayerClient(t *testing.T) {
t.Skip() // Test is not finished yet

client := new(interchain.DALayerClient)
Expand All @@ -25,12 +25,18 @@ func TestDALayerClient_Init(t *testing.T) {
err = client.Init(rawConfig, nil, nil, logger)
require.NoError(t, err)

result := client.SubmitBatchV2(&types.Batch{
batch := types.Batch{
StartHeight: 1,
EndHeight: 3,
Blocks: []*types.Block{{Header: types.Header{Height: 1}}},
Commits: []*types.Commit{{Height: 1}},
})
require.NoError(t, result.Error)
t.Logf("result: %#v", result)
}

submitResult := client.SubmitBatchV2(batch)
require.NoError(t, submitResult.Error)

retrieveResult := client.RetrieveBatchesV2(submitResult)
require.NoError(t, retrieveResult.Error)

require.Equal(t, batch, retrieveResult.Batch)
}
54 changes: 51 additions & 3 deletions da/interchain/retrieve_batches.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,59 @@
package interchain

import "github.com/dymensionxyz/dymint/da"
import (
"fmt"

"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/types"
interchainda "github.com/dymensionxyz/dymint/types/pb/interchain_da"
)

func (c *DALayerClient) RetrieveBatches(daMetaData *da.DASubmitMetaData) da.ResultRetrieveBatch {
panic("RetrieveBatches method is not supported by the interchain DA clint")
}

func (c *DALayerClient) RetrieveBatchesV2(da.ResultSubmitBatchV2) da.ResultRetrieveBatchV2 {
panic("implement me")
func (c *DALayerClient) RetrieveBatchesV2(b da.ResultSubmitBatchV2) da.ResultRetrieveBatchV2 {
batch, err := c.retrieveBatches(b)
if err != nil {
return da.ResultRetrieveBatchV2{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: fmt.Sprintf("can't retrieve batch from the interchain DA layer: %s", err.Error()),
Error: err,
},
Batch: types.Batch{},
}
}

return da.ResultRetrieveBatchV2{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
Message: "Retrieve successful",
},
Batch: batch,
}
}

func (c *DALayerClient) retrieveBatches(b da.ResultSubmitBatchV2) (types.Batch, error) {
var commitment *interchainda.Commitment
err := c.cdc.UnpackAny(b.DAPath.Commitment, &commitment)
if err != nil {
return types.Batch{}, fmt.Errorf("can't unpack commitment: %w", err)
}

resp, err := c.daClient.Blob(c.ctx, interchainda.BlobID(commitment.BlobId))
if err != nil {
return types.Batch{}, fmt.Errorf("can't get blob from the interchain DA layer: %w", err)
}

if resp.BlobMetadata.BlobHash != commitment.BlobHash {
return types.Batch{}, fmt.Errorf("commitment blob hash doesn't match interchain DA layer blob hash")
}

batch, err := DecodeBatch(resp.Blob)
if err != nil {
return types.Batch{}, fmt.Errorf("can't decode batch from interchain DA layer: %w", err)
}

return batch, nil
}
18 changes: 5 additions & 13 deletions da/interchain/submit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/dymensionxyz/cosmosclient/cosmosclient"

"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/da/interchain/ioutils"
"github.com/dymensionxyz/dymint/types"
interchainda "github.com/dymensionxyz/dymint/types/pb/interchain_da"
)
Expand All @@ -22,7 +21,7 @@ func (c *DALayerClient) SubmitBatch(*types.Batch) da.ResultSubmitBatch {
panic("SubmitBatch method is not supported by the interchain DA clint")
}

func (c *DALayerClient) SubmitBatchV2(batch *types.Batch) da.ResultSubmitBatchV2 {
func (c *DALayerClient) SubmitBatchV2(batch types.Batch) da.ResultSubmitBatchV2 {
commitment, err := c.submitBatch(batch)
if err != nil {
return da.ResultSubmitBatchV2{
Expand Down Expand Up @@ -62,17 +61,10 @@ func (c *DALayerClient) SubmitBatchV2(batch *types.Batch) da.ResultSubmitBatchV2
}

// submitBatch is used to process and transmit batches to the interchain DA.
func (c *DALayerClient) submitBatch(batch *types.Batch) (*interchainda.Commitment, error) {
// Prepare the blob data
blob, err := batch.MarshalBinary()
func (c *DALayerClient) submitBatch(batch types.Batch) (*interchainda.Commitment, error) {
blob, err := EncodeBatch(batch)
if err != nil {
return nil, fmt.Errorf("can't marshal batch: %w", err)
}

// Gzip the blob
gzipped, err := ioutils.Gzip(blob)
if err != nil {
return nil, fmt.Errorf("can't gzip batch: %w", err)
return nil, fmt.Errorf("can't encode batch to interchain DA format: %w", err)
}

// Verify the size of the blob is within the limit
Expand All @@ -86,7 +78,7 @@ func (c *DALayerClient) submitBatch(batch *types.Batch) (*interchainda.Commitmen
// Prepare the message to be sent to the DA layer
msg := interchainda.MsgSubmitBlob{
Creator: c.accountAddress,
Blob: gzipped,
Blob: blob,
Fees: feesToPay,
}

Expand Down
Loading