Skip to content

Commit

Permalink
Merge branch 'master' into control-chunk-length
Browse files Browse the repository at this point in the history
  • Loading branch information
mooselumph committed Aug 19, 2024
2 parents 5f44a1d + cacdc21 commit 757a4ae
Show file tree
Hide file tree
Showing 49 changed files with 2,572 additions and 791 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ We welcome all contributions! There are many ways to contribute to the project,
- [Open an Issue](https://github.com/Layr-Labs/eigenda/issues/new/choose)
- [EigenLayer/EigenDA forum](https://forum.eigenlayer.xyz/c/eigenda/9)
- [Email](mailto:[email protected])
- [Follow us on Twitter](https://twitter.com/eigen_da)
- [Follow us on X](https://x.com/eigen_da)
11 changes: 10 additions & 1 deletion api/clients/mock/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,18 @@ func (c *MockNodeClient) GetChunks(
) {
args := c.Called(opID, opInfo, batchHeaderHash, blobIndex)
encodedBlob := (args.Get(0)).(core.EncodedBlob)
chunks, err := encodedBlob.EncodedBundlesByOperator[opID][quorumID].ToFrames()
if err != nil {
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: err,
Chunks: nil,
}

}
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: nil,
Chunks: encodedBlob.BundlesByOperator[opID][quorumID],
Chunks: chunks,
}
}
20 changes: 10 additions & 10 deletions api/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"errors"
"time"

"github.com/Layr-Labs/eigenda/api/grpc/node"
grpcnode "github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
node_utils "github.com/Layr-Labs/eigenda/node/grpc"
"github.com/Layr-Labs/eigenda/node"
"github.com/wealdtech/go-merkletree/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -50,11 +50,11 @@ func (c client) GetBlobHeader(
}
defer conn.Close()

n := node.NewRetrievalClient(conn)
n := grpcnode.NewRetrievalClient(conn)
nodeCtx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

request := &node.GetBlobHeaderRequest{
request := &grpcnode.GetBlobHeaderRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: blobIndex,
}
Expand All @@ -64,7 +64,7 @@ func (c client) GetBlobHeader(
return nil, nil, err
}

blobHeader, err := node_utils.GetBlobHeaderFromProto(reply.GetBlobHeader())
blobHeader, err := node.GetBlobHeaderFromProto(reply.GetBlobHeader())
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -99,11 +99,11 @@ func (c client) GetChunks(
return
}

n := node.NewRetrievalClient(conn)
n := grpcnode.NewRetrievalClient(conn)
nodeCtx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

request := &node.RetrieveChunksRequest{
request := &grpcnode.RetrieveChunksRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: blobIndex,
QuorumId: uint32(quorumID),
Expand All @@ -123,11 +123,11 @@ func (c client) GetChunks(
for i, data := range reply.GetChunks() {
var chunk *encoding.Frame
switch reply.GetChunkEncodingFormat() {
case node.ChunkEncodingFormat_GNARK:
case grpcnode.ChunkEncodingFormat_GNARK:
chunk, err = new(encoding.Frame).DeserializeGnark(data)
case node.ChunkEncodingFormat_GOB:
case grpcnode.ChunkEncodingFormat_GOB:
chunk, err = new(encoding.Frame).Deserialize(data)
case node.ChunkEncodingFormat_UNKNOWN:
case grpcnode.ChunkEncodingFormat_UNKNOWN:
// For backward compatibility, we fallback the UNKNOWN to GOB
chunk, err = new(encoding.Frame).Deserialize(data)
if err != nil {
Expand Down
10 changes: 7 additions & 3 deletions api/clients/retrieval_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ var (
retrievalClient clients.RetrievalClient
blobHeader *core.BlobHeader
encodedBlob core.EncodedBlob = core.EncodedBlob{
BlobHeader: nil,
BundlesByOperator: make(map[core.OperatorID]core.Bundles),
BlobHeader: nil,
EncodedBundlesByOperator: make(map[core.OperatorID]core.EncodedBundles),
}
batchHeaderHash [32]byte
batchRoot [32]byte
Expand Down Expand Up @@ -198,7 +198,11 @@ func setup(t *testing.T) {
bundles := make(map[core.QuorumID]core.Bundle, len(blobHeader.QuorumInfos))
bundles[quorumID] = chunks[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks]
encodedBlob.BlobHeader = blobHeader
encodedBlob.BundlesByOperator[id] = bundles
eb, err := core.Bundles(bundles).ToEncodedBundles()
if err != nil {
t.Fatal(err)
}
encodedBlob.EncodedBundlesByOperator[id] = eb
}

}
Expand Down
90 changes: 86 additions & 4 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,49 @@ func (cd *ChunksData) Size() uint64 {
return size
}

func (cd *ChunksData) FromFrames(fr []*encoding.Frame) (*ChunksData, error) {
if len(fr) == 0 {
return nil, errors.New("no frame is provided")
}
var c ChunksData
c.Format = GnarkChunkEncodingFormat
c.ChunkLen = fr[0].Length()
c.Chunks = make([][]byte, 0, len(fr))
for _, f := range fr {
bytes, err := f.SerializeGnark()
if err != nil {
return nil, err
}
c.Chunks = append(c.Chunks, bytes)
}
return &c, nil
}

func (cd *ChunksData) ToFrames() ([]*encoding.Frame, error) {
frames := make([]*encoding.Frame, 0, len(cd.Chunks))
switch cd.Format {
case GobChunkEncodingFormat:
for _, data := range cd.Chunks {
fr, err := new(encoding.Frame).Deserialize(data)
if err != nil {
return nil, err
}
frames = append(frames, fr)
}
case GnarkChunkEncodingFormat:
for _, data := range cd.Chunks {
fr, err := new(encoding.Frame).DeserializeGnark(data)
if err != nil {
return nil, err
}
frames = append(frames, fr)
}
default:
return nil, fmt.Errorf("invalid chunk encoding format: %v", cd.Format)
}
return frames, nil
}

func (cd *ChunksData) FlattenToBundle() ([]byte, error) {
// Only Gnark coded chunks are dispersed as a byte array.
// Gob coded chunks are not flattened.
Expand Down Expand Up @@ -128,8 +171,9 @@ func (cd *ChunksData) ToGobFormat() (*ChunksData, error) {
gobChunks = append(gobChunks, gob)
}
return &ChunksData{
Chunks: gobChunks,
Format: GobChunkEncodingFormat,
Chunks: gobChunks,
Format: GobChunkEncodingFormat,
ChunkLen: cd.ChunkLen,
}, nil
}

Expand All @@ -153,8 +197,9 @@ func (cd *ChunksData) ToGnarkFormat() (*ChunksData, error) {
gnarkChunks = append(gnarkChunks, gnark)
}
return &ChunksData{
Chunks: gnarkChunks,
Format: GnarkChunkEncodingFormat,
Chunks: gnarkChunks,
Format: GnarkChunkEncodingFormat,
ChunkLen: cd.ChunkLen,
}, nil
}

Expand Down Expand Up @@ -266,6 +311,8 @@ type BatchHeader struct {
type EncodedBlob struct {
BlobHeader *BlobHeader
BundlesByOperator map[OperatorID]Bundles
// EncodedBundlesByOperator is bundles in encoded format (not deserialized)
EncodedBundlesByOperator map[OperatorID]EncodedBundles
}

// A Bundle is the collection of chunks associated with a single blob, for a single operator and a single quorum.
Expand All @@ -274,12 +321,23 @@ type Bundle []*encoding.Frame
// Bundles is the collection of bundles associated with a single blob and a single operator.
type Bundles map[QuorumID]Bundle

// This is similar to Bundle, but tracks chunks in encoded format (i.e. not deserialized).
type EncodedBundles map[QuorumID]*ChunksData

// BlobMessage is the message that is sent to DA nodes. It contains the blob header and the associated chunk bundles.
type BlobMessage struct {
BlobHeader *BlobHeader
Bundles Bundles
}

// This is similar to BlobMessage, but keep the commitments and chunks in encoded format
// (i.e. not deserialized)
type EncodedBlobMessage struct {
// TODO(jianoaix): Change the commitments to encoded format.
BlobHeader *BlobHeader
EncodedBundles map[QuorumID]*ChunksData
}

func (b Bundle) Size() uint64 {
size := uint64(0)
for _, chunk := range b {
Expand Down Expand Up @@ -388,3 +446,27 @@ func (cb Bundles) Size() uint64 {
}
return size
}

func (cb Bundles) ToEncodedBundles() (EncodedBundles, error) {
eb := make(EncodedBundles)
for quorum, bundle := range cb {
cd, err := new(ChunksData).FromFrames(bundle)
if err != nil {
return nil, err
}
eb[quorum] = cd
}
return eb, nil
}

func (cb Bundles) FromEncodedBundles(eb EncodedBundles) (Bundles, error) {
c := make(Bundles)
for quorum, chunkData := range eb {
fr, err := chunkData.ToFrames()
if err != nil {
return nil, err
}
c[quorum] = fr
}
return c, nil
}
Loading

0 comments on commit 757a4ae

Please sign in to comment.