diff --git a/api/clients/mock/node_client.go b/api/clients/mock/node_client.go index 42228ae95d..6e13ee61b3 100644 --- a/api/clients/mock/node_client.go +++ b/api/clients/mock/node_client.go @@ -54,9 +54,18 @@ func (c *MockNodeClient) GetChunks( ) { args := c.Called(opID, opInfo, batchHeaderHash, blobIndex) encodedBlob := (args.Get(0)).(core.EncodedBlob) + chunks, err := encodedBlob.EncodedBundlesByOperator[opID][quorumID].ToFrames() + if err != nil { + chunksChan <- clients.RetrievedChunks{ + OperatorID: opID, + Err: err, + Chunks: nil, + } + + } chunksChan <- clients.RetrievedChunks{ OperatorID: opID, Err: nil, - Chunks: encodedBlob.BundlesByOperator[opID][quorumID], + Chunks: chunks, } } diff --git a/api/clients/retrieval_client_test.go b/api/clients/retrieval_client_test.go index bd2e12f752..2eb1b72f40 100644 --- a/api/clients/retrieval_client_test.go +++ b/api/clients/retrieval_client_test.go @@ -60,8 +60,8 @@ var ( retrievalClient clients.RetrievalClient blobHeader *core.BlobHeader encodedBlob core.EncodedBlob = core.EncodedBlob{ - BlobHeader: nil, - BundlesByOperator: make(map[core.OperatorID]core.Bundles), + BlobHeader: nil, + EncodedBundlesByOperator: make(map[core.OperatorID]core.EncodedBundles), } batchHeaderHash [32]byte batchRoot [32]byte @@ -198,7 +198,11 @@ func setup(t *testing.T) { bundles := make(map[core.QuorumID]core.Bundle, len(blobHeader.QuorumInfos)) bundles[quorumID] = chunks[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks] encodedBlob.BlobHeader = blobHeader - encodedBlob.BundlesByOperator[id] = bundles + eb, err := core.Bundles(bundles).ToEncodedBundles() + if err != nil { + t.Fatal(err) + } + encodedBlob.EncodedBundlesByOperator[id] = eb } } diff --git a/core/data.go b/core/data.go index 7829ef3ad6..45f9e750f8 100644 --- a/core/data.go +++ b/core/data.go @@ -87,6 +87,49 @@ func (cd *ChunksData) Size() uint64 { return size } +func (cd *ChunksData) FromFrames(fr []*encoding.Frame) (*ChunksData, error) { + if len(fr) == 0 { + return nil, errors.New("no frame is provided") + } + var c ChunksData + c.Format = GnarkChunkEncodingFormat + c.ChunkLen = fr[0].Length() + c.Chunks = make([][]byte, 0, len(fr)) + for _, f := range fr { + bytes, err := f.SerializeGnark() + if err != nil { + return nil, err + } + c.Chunks = append(c.Chunks, bytes) + } + return &c, nil +} + +func (cd *ChunksData) ToFrames() ([]*encoding.Frame, error) { + frames := make([]*encoding.Frame, 0, len(cd.Chunks)) + switch cd.Format { + case GobChunkEncodingFormat: + for _, data := range cd.Chunks { + fr, err := new(encoding.Frame).Deserialize(data) + if err != nil { + return nil, err + } + frames = append(frames, fr) + } + case GnarkChunkEncodingFormat: + for _, data := range cd.Chunks { + fr, err := new(encoding.Frame).DeserializeGnark(data) + if err != nil { + return nil, err + } + frames = append(frames, fr) + } + default: + return nil, fmt.Errorf("invalid chunk encoding format: %v", cd.Format) + } + return frames, nil +} + func (cd *ChunksData) FlattenToBundle() ([]byte, error) { // Only Gnark coded chunks are dispersed as a byte array. // Gob coded chunks are not flattened. @@ -128,8 +171,9 @@ func (cd *ChunksData) ToGobFormat() (*ChunksData, error) { gobChunks = append(gobChunks, gob) } return &ChunksData{ - Chunks: gobChunks, - Format: GobChunkEncodingFormat, + Chunks: gobChunks, + Format: GobChunkEncodingFormat, + ChunkLen: cd.ChunkLen, }, nil } @@ -153,8 +197,9 @@ func (cd *ChunksData) ToGnarkFormat() (*ChunksData, error) { gnarkChunks = append(gnarkChunks, gnark) } return &ChunksData{ - Chunks: gnarkChunks, - Format: GnarkChunkEncodingFormat, + Chunks: gnarkChunks, + Format: GnarkChunkEncodingFormat, + ChunkLen: cd.ChunkLen, }, nil } @@ -266,6 +311,8 @@ type BatchHeader struct { type EncodedBlob struct { BlobHeader *BlobHeader BundlesByOperator map[OperatorID]Bundles + // EncodedBundlesByOperator is bundles in encoded format (not deserialized) + EncodedBundlesByOperator map[OperatorID]EncodedBundles } // A Bundle is the collection of chunks associated with a single blob, for a single operator and a single quorum. @@ -274,12 +321,23 @@ type Bundle []*encoding.Frame // Bundles is the collection of bundles associated with a single blob and a single operator. type Bundles map[QuorumID]Bundle +// This is similar to Bundle, but tracks chunks in encoded format (i.e. not deserialized). +type EncodedBundles map[QuorumID]*ChunksData + // BlobMessage is the message that is sent to DA nodes. It contains the blob header and the associated chunk bundles. type BlobMessage struct { BlobHeader *BlobHeader Bundles Bundles } +// This is similar to BlobMessage, but keep the commitments and chunks in encoded format +// (i.e. not deserialized) +type EncodedBlobMessage struct { + // TODO(jianoaix): Change the commitments to encoded format. + BlobHeader *BlobHeader + EncodedBundles map[QuorumID]*ChunksData +} + func (b Bundle) Size() uint64 { size := uint64(0) for _, chunk := range b { @@ -388,3 +446,27 @@ func (cb Bundles) Size() uint64 { } return size } + +func (cb Bundles) ToEncodedBundles() (EncodedBundles, error) { + eb := make(EncodedBundles) + for quorum, bundle := range cb { + cd, err := new(ChunksData).FromFrames(bundle) + if err != nil { + return nil, err + } + eb[quorum] = cd + } + return eb, nil +} + +func (cb Bundles) FromEncodedBundles(eb EncodedBundles) (Bundles, error) { + c := make(Bundles) + for quorum, chunkData := range eb { + fr, err := chunkData.ToFrames() + if err != nil { + return nil, err + } + c[quorum] = fr + } + return c, nil +} diff --git a/core/data_test.go b/core/data_test.go index c062c72442..84cb5097e9 100644 --- a/core/data_test.go +++ b/core/data_test.go @@ -33,6 +33,52 @@ func createBundle(t *testing.T, numFrames, numCoeffs, seed int) core.Bundle { return frames } +func createChunksData(t *testing.T, seed int) (core.Bundle, *core.ChunksData, *core.ChunksData) { + bundle := createBundle(t, 64, 64, seed) + gobChunks := make([][]byte, len(bundle)) + gnarkChunks := make([][]byte, len(bundle)) + for i, frame := range bundle { + gobChunk, err := frame.Serialize() + assert.Nil(t, err) + gobChunks[i] = gobChunk + + gnarkChunk, err := frame.SerializeGnark() + assert.Nil(t, err) + gnarkChunks[i] = gnarkChunk + } + gob := &core.ChunksData{ + Chunks: gobChunks, + Format: core.GobChunkEncodingFormat, + ChunkLen: 64, + } + gnark := &core.ChunksData{ + Chunks: gnarkChunks, + Format: core.GnarkChunkEncodingFormat, + ChunkLen: 64, + } + return bundle, gob, gnark +} + +func checkChunksDataEquivalence(t *testing.T, cd1, cd2 *core.ChunksData) { + assert.Equal(t, cd1.Format, cd2.Format) + assert.Equal(t, cd1.ChunkLen, cd2.ChunkLen) + assert.Equal(t, len(cd1.Chunks), len(cd2.Chunks)) + for i, c1 := range cd1.Chunks { + assert.True(t, bytes.Equal(c1, cd2.Chunks[i])) + } +} + +func checkBundleEquivalence(t *testing.T, b1, b2 core.Bundle) { + assert.Equal(t, len(b1), len(b2)) + for i := 0; i < len(b1); i++ { + assert.True(t, b1[i].Proof.Equal(&b2[i].Proof)) + assert.Equal(t, len(b1[i].Coeffs), len(b2[i].Coeffs)) + for j := 0; j < len(b1[i].Coeffs); j++ { + assert.True(t, b1[i].Coeffs[j].Equal(&b2[i].Coeffs[j])) + } + } +} + func TestInvalidBundleSer(t *testing.T) { b1 := createBundle(t, 1, 0, 0) _, err := b1.Serialize() @@ -86,41 +132,38 @@ func TestBundleEncoding(t *testing.T) { assert.Nil(t, err) decoded, err := new(core.Bundle).Deserialize(bytes) assert.Nil(t, err) - assert.Equal(t, len(bundle), len(decoded)) - for i := 0; i < len(bundle); i++ { - assert.True(t, bundle[i].Proof.Equal(&decoded[i].Proof)) - assert.Equal(t, len(bundle[i].Coeffs), len(decoded[i].Coeffs)) - for j := 0; j < len(bundle[i].Coeffs); j++ { - assert.True(t, bundle[i].Coeffs[j].Equal(&decoded[i].Coeffs[j])) - } - } + checkBundleEquivalence(t, bundle, decoded) } } -func createChunksData(t *testing.T, seed int) (core.Bundle, *core.ChunksData, *core.ChunksData) { - bundle := createBundle(t, 64, 64, seed) - gobChunks := make([][]byte, len(bundle)) - gnarkChunks := make([][]byte, len(bundle)) - for i, frame := range bundle { - gobChunk, err := frame.Serialize() +func TestEncodedBundles(t *testing.T) { + numTrials := 16 + for i := 0; i < numTrials; i++ { + bundles := core.Bundles(map[core.QuorumID]core.Bundle{ + 0: createBundle(t, 64, 64, i), + 1: createBundle(t, 64, 64, i+numTrials), + }) + // ToEncodedBundles + ec, err := bundles.ToEncodedBundles() assert.Nil(t, err) - gobChunks[i] = gobChunk - - gnarkChunk, err := frame.SerializeGnark() + assert.Equal(t, len(ec), len(bundles)) + for quorum, bundle := range bundles { + cd, ok := ec[quorum] + assert.True(t, ok) + fr, err := cd.ToFrames() + assert.Nil(t, err) + checkBundleEquivalence(t, fr, bundle) + } + // FromEncodedBundles + bundles2, err := new(core.Bundles).FromEncodedBundles(ec) assert.Nil(t, err) - gnarkChunks[i] = gnarkChunk - } - gob := &core.ChunksData{ - Chunks: gobChunks, - Format: core.GobChunkEncodingFormat, - ChunkLen: 64, - } - gnark := &core.ChunksData{ - Chunks: gnarkChunks, - Format: core.GnarkChunkEncodingFormat, - ChunkLen: 64, + assert.Equal(t, len(bundles2), len(bundles)) + for quorum, bundle := range bundles { + b, ok := bundles2[quorum] + assert.True(t, ok) + checkBundleEquivalence(t, b, bundle) + } } - return bundle, gob, gnark } func TestChunksData(t *testing.T) { @@ -136,26 +179,31 @@ func TestChunksData(t *testing.T) { assert.Equal(t, convertedGob, gob) convertedGob, err = gnark.ToGobFormat() assert.Nil(t, err) - assert.Equal(t, len(gob.Chunks), len(convertedGob.Chunks)) - for i := 0; i < len(gob.Chunks); i++ { - assert.True(t, bytes.Equal(gob.Chunks[i], convertedGob.Chunks[i])) - } + checkChunksDataEquivalence(t, gob, convertedGob) // ToGnarkFormat convertedGnark, err := gnark.ToGnarkFormat() assert.Nil(t, err) assert.Equal(t, convertedGnark, gnark) convertedGnark, err = gob.ToGnarkFormat() assert.Nil(t, err) - assert.Equal(t, len(gnark.Chunks), len(convertedGnark.Chunks)) - for i := 0; i < len(gnark.Chunks); i++ { - assert.True(t, bytes.Equal(gnark.Chunks[i], convertedGnark.Chunks[i])) - } + checkChunksDataEquivalence(t, gnark, convertedGnark) // FlattenToBundle bytesFromChunksData, err := gnark.FlattenToBundle() assert.Nil(t, err) bytesFromBundle, err := bundle.Serialize() assert.Nil(t, err) assert.True(t, bytes.Equal(bytesFromChunksData, bytesFromBundle)) + // FromFrames + cd, err := new(core.ChunksData).FromFrames(bundle) + assert.Nil(t, err) + checkChunksDataEquivalence(t, cd, gnark) + // ToFrames + fr1, err := gob.ToFrames() + assert.Nil(t, err) + checkBundleEquivalence(t, bundle, fr1) + fr2, err := gnark.ToFrames() + assert.Nil(t, err) + checkBundleEquivalence(t, bundle, fr2) // Invalid cases gnark.Chunks[0] = gnark.Chunks[0][1:] _, err = gnark.FlattenToBundle() diff --git a/core/test/core_test.go b/core/test/core_test.go index 04dd74a79e..9ee1ec6b10 100644 --- a/core/test/core_test.go +++ b/core/test/core_test.go @@ -113,8 +113,8 @@ func prepareBatch(t *testing.T, operatorCount uint, blobs []core.Blob, bn uint) blobHeaders[z] = blobHeader encodedBlob := core.EncodedBlob{ - BlobHeader: blobHeader, - BundlesByOperator: make(map[core.OperatorID]core.Bundles), + BlobHeader: blobHeader, + EncodedBundlesByOperator: make(map[core.OperatorID]core.EncodedBundles), } encodedBlobs[z] = encodedBlob @@ -156,6 +156,14 @@ func prepareBatch(t *testing.T, operatorCount uint, blobs []core.Blob, bn uint) if err != nil { t.Fatal(err) } + bytes := make([][]byte, 0, len(chunks)) + for _, c := range chunks { + serialized, err := c.Serialize() + if err != nil { + t.Fatal(err) + } + bytes = append(bytes, serialized) + } blobHeader.BlobCommitments = encoding.BlobCommitments{ Commitment: commitments.Commitment, @@ -167,13 +175,18 @@ func prepareBatch(t *testing.T, operatorCount uint, blobs []core.Blob, bn uint) blobHeader.QuorumInfos = append(blobHeader.QuorumInfos, quorumHeader) for id, assignment := range assignments { - _, ok := encodedBlob.BundlesByOperator[id] + chunksData := &core.ChunksData{ + Format: core.GobChunkEncodingFormat, + ChunkLen: int(chunkLength), + Chunks: bytes[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks], + } + _, ok := encodedBlob.EncodedBundlesByOperator[id] if !ok { - encodedBlob.BundlesByOperator[id] = map[core.QuorumID]core.Bundle{ - quorumID: chunks[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks], + encodedBlob.EncodedBundlesByOperator[id] = map[core.QuorumID]*core.ChunksData{ + quorumID: chunksData, } } else { - encodedBlob.BundlesByOperator[id][quorumID] = chunks[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks] + encodedBlob.EncodedBundlesByOperator[id][quorumID] = chunksData } } @@ -207,9 +220,13 @@ func checkBatchByUniversalVerifier(cst core.IndexedChainState, encodedBlobs []co val.UpdateOperatorID(id) blobMessages := make([]*core.BlobMessage, numBlob) for z, encodedBlob := range encodedBlobs { + bundles, err := new(core.Bundles).FromEncodedBundles(encodedBlob.EncodedBundlesByOperator[id]) + if err != nil { + return err + } blobMessages[z] = &core.BlobMessage{ BlobHeader: encodedBlob.BlobHeader, - Bundles: encodedBlob.BundlesByOperator[id], + Bundles: bundles, } } err := val.ValidateBatch(&header, blobMessages, state.OperatorState, pool) diff --git a/disperser/batcher/batcher_test.go b/disperser/batcher/batcher_test.go index 29786e9205..0a2e8d91a7 100644 --- a/disperser/batcher/batcher_test.go +++ b/disperser/batcher/batcher_test.go @@ -227,7 +227,7 @@ func TestBatcherIterations(t *testing.T) { assert.NoError(t, err) count, size := components.encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, 2, count) - assert.Equal(t, uint64(24576), size) // Robert checks it + assert.Equal(t, uint64(27631), size) txn := types.NewTransaction(0, gethcommon.Address{}, big.NewInt(0), 0, big.NewInt(0), nil) components.transactor.On("BuildConfirmBatchTxn", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { diff --git a/disperser/batcher/encoded_blob_store.go b/disperser/batcher/encoded_blob_store.go index 5ce98d6d71..6deccee54c 100644 --- a/disperser/batcher/encoded_blob_store.go +++ b/disperser/batcher/encoded_blob_store.go @@ -29,7 +29,7 @@ type EncodingResult struct { ReferenceBlockNumber uint BlobQuorumInfo *core.BlobQuorumInfo Commitment *encoding.BlobCommitments - Chunks []*encoding.Frame + ChunksData *core.ChunksData Assignments map[core.OperatorID]core.Assignment } @@ -197,5 +197,8 @@ func getRequestID(key disperser.BlobKey, quorumID core.QuorumID) requestID { // getChunksSize returns the total size of all the chunks in the encoded result in bytes func getChunksSize(result *EncodingResult) uint64 { - return core.Bundle(result.Chunks).Size() + if result == nil || result.ChunksData == nil { + return 0 + } + return result.ChunksData.Size() } diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 3b22a3f829..99173f6f83 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -386,7 +386,7 @@ func (e *EncodingStreamer) RequestEncodingForBlob(ctx context.Context, metadata ReferenceBlockNumber: referenceBlockNumber, BlobQuorumInfo: res.BlobQuorumInfo, Commitment: commits, - Chunks: chunks, + ChunksData: chunks, Assignments: res.Assignments, }, Err: nil, @@ -482,19 +482,22 @@ func (e *EncodingStreamer) CreateMinibatch(ctx context.Context) (*batch, error) } blobHeaderByKey[blobKey] = blobHeader encodedBlobByKey[blobKey] = core.EncodedBlob{ - BlobHeader: blobHeader, - BundlesByOperator: make(map[core.OperatorID]core.Bundles), + BlobHeader: blobHeader, + EncodedBundlesByOperator: make(map[core.OperatorID]core.EncodedBundles), } } // Populate the assigned bundles for opID, assignment := range result.Assignments { - bundles, ok := encodedBlobByKey[blobKey].BundlesByOperator[opID] + bundles, ok := encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] if !ok { - encodedBlobByKey[blobKey].BundlesByOperator[opID] = make(core.Bundles) - bundles = encodedBlobByKey[blobKey].BundlesByOperator[opID] + encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] = make(core.EncodedBundles) + bundles = encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] } - bundles[result.BlobQuorumInfo.QuorumID] = append(bundles[result.BlobQuorumInfo.QuorumID], result.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) + bundles[result.BlobQuorumInfo.QuorumID] = new(core.ChunksData) + bundles[result.BlobQuorumInfo.QuorumID].Format = result.ChunksData.Format + bundles[result.BlobQuorumInfo.QuorumID].Chunks = append(bundles[result.BlobQuorumInfo.QuorumID].Chunks, result.ChunksData.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) + bundles[result.BlobQuorumInfo.QuorumID].ChunkLen = result.ChunksData.ChunkLen } blobQuorums[blobKey] = append(blobQuorums[blobKey], result.BlobQuorumInfo) @@ -631,19 +634,22 @@ func (e *EncodingStreamer) CreateBatch(ctx context.Context) (*batch, error) { } blobHeaderByKey[blobKey] = blobHeader encodedBlobByKey[blobKey] = core.EncodedBlob{ - BlobHeader: blobHeader, - BundlesByOperator: make(map[core.OperatorID]core.Bundles), + BlobHeader: blobHeader, + EncodedBundlesByOperator: make(map[core.OperatorID]core.EncodedBundles), } } // Populate the assigned bundles for opID, assignment := range result.Assignments { - bundles, ok := encodedBlobByKey[blobKey].BundlesByOperator[opID] + bundles, ok := encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] if !ok { - encodedBlobByKey[blobKey].BundlesByOperator[opID] = make(core.Bundles) - bundles = encodedBlobByKey[blobKey].BundlesByOperator[opID] + encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] = make(core.EncodedBundles) + bundles = encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] } - bundles[result.BlobQuorumInfo.QuorumID] = append(bundles[result.BlobQuorumInfo.QuorumID], result.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) + bundles[result.BlobQuorumInfo.QuorumID] = new(core.ChunksData) + bundles[result.BlobQuorumInfo.QuorumID].Format = result.ChunksData.Format + bundles[result.BlobQuorumInfo.QuorumID].Chunks = append(bundles[result.BlobQuorumInfo.QuorumID].Chunks, result.ChunksData.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) + bundles[result.BlobQuorumInfo.QuorumID].ChunkLen = result.ChunksData.ChunkLen } blobQuorums[blobKey] = append(blobQuorums[blobKey], result.BlobQuorumInfo) diff --git a/disperser/batcher/encoding_streamer_test.go b/disperser/batcher/encoding_streamer_test.go index d84ca03492..5c54dfc880 100644 --- a/disperser/batcher/encoding_streamer_test.go +++ b/disperser/batcher/encoding_streamer_test.go @@ -142,7 +142,7 @@ func TestEncodingQueueLimit(t *testing.T) { } func TestBatchTrigger(t *testing.T) { - encodingStreamer, c := createEncodingStreamer(t, 10, 20_000, streamerConfig) + encodingStreamer, c := createEncodingStreamer(t, 10, 30_000, streamerConfig) blob := makeTestBlob([]*core.SecurityParam{{ QuorumID: 0, @@ -160,7 +160,7 @@ func TestBatchTrigger(t *testing.T) { assert.Nil(t, err) count, size := encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 1) - assert.Equal(t, size, uint64(16384)) + assert.Equal(t, size, uint64(26630)) // try encode the same blobs again at different block (this happens when the blob is retried) encodingStreamer.ReferenceBlockNumber = 11 @@ -171,7 +171,7 @@ func TestBatchTrigger(t *testing.T) { count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 1) - assert.Equal(t, size, uint64(16384)) + assert.Equal(t, size, uint64(26630)) // don't notify yet select { @@ -190,7 +190,7 @@ func TestBatchTrigger(t *testing.T) { count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 2) - assert.Equal(t, size, uint64(16384)*2) + assert.Equal(t, size, uint64(26630)*2) // notify select { @@ -246,12 +246,12 @@ func TestStreamingEncoding(t *testing.T) { assert.NotNil(t, encodedResult.Commitment.LengthProof) assert.Greater(t, encodedResult.Commitment.Length, uint(0)) assert.Len(t, encodedResult.Assignments, numOperators) - assert.Len(t, encodedResult.Chunks, 32) + assert.Len(t, encodedResult.ChunksData.Chunks, 32) isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey, core.QuorumID(0), 10) assert.True(t, isRequested) count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 1) - assert.Equal(t, size, uint64(16384)) + assert.Equal(t, size, uint64(26630)) // Cancel previous blob so it doesn't get reencoded. err = c.blobStore.MarkBlobFailed(ctx, metadataKey) @@ -281,7 +281,7 @@ func TestStreamingEncoding(t *testing.T) { assert.True(t, isRequested) count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 1) - assert.Equal(t, size, uint64(16384)) + assert.Equal(t, size, uint64(26630)) // Request the same blob, which should be dedupped _, err = c.blobStore.StoreBlob(ctx, &blob, requestedAt) @@ -292,7 +292,7 @@ func TestStreamingEncoding(t *testing.T) { // It should not have been added to the encoded blob store count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 1) - assert.Equal(t, size, uint64(16384)) + assert.Equal(t, size, uint64(26630)) } func TestEncodingFailure(t *testing.T) { @@ -445,7 +445,7 @@ func TestPartialBlob(t *testing.T) { // Check EncodedBlobs assert.Len(t, batch.EncodedBlobs, 1) - assert.Len(t, batch.EncodedBlobs[0].BundlesByOperator, numOperators) + assert.Len(t, batch.EncodedBlobs[0].EncodedBundlesByOperator, numOperators) encodedBlob1 := batch.EncodedBlobs[0] assert.NotNil(t, encodedBlob1) @@ -465,10 +465,10 @@ func TestPartialBlob(t *testing.T) { }}) assert.Contains(t, batch.BlobHeaders, encodedBlob1.BlobHeader) - assert.Len(t, encodedBlob1.BundlesByOperator, numOperators) - for _, bundles := range encodedBlob1.BundlesByOperator { + assert.Len(t, encodedBlob1.EncodedBundlesByOperator, numOperators) + for _, bundles := range encodedBlob1.EncodedBundlesByOperator { assert.Len(t, bundles, 1) - assert.Greater(t, len(bundles[0]), 0) + assert.Greater(t, len(bundles[0].Chunks), 0) break } @@ -674,7 +674,7 @@ func TestGetBatch(t *testing.T) { // Check EncodedBlobs assert.Len(t, batch.EncodedBlobs, 2) - assert.Len(t, batch.EncodedBlobs[0].BundlesByOperator, numOperators) + assert.Len(t, batch.EncodedBlobs[0].EncodedBundlesByOperator, numOperators) var encodedBlob1 core.EncodedBlob var encodedBlob2 core.EncodedBlob @@ -718,10 +718,10 @@ func TestGetBatch(t *testing.T) { }) assert.Contains(t, batch.BlobHeaders, encodedBlob1.BlobHeader) - for _, bundles := range encodedBlob1.BundlesByOperator { + for _, bundles := range encodedBlob1.EncodedBundlesByOperator { assert.Len(t, bundles, 2) - assert.Greater(t, len(bundles[0]), 0) - assert.Greater(t, len(bundles[1]), 0) + assert.Greater(t, len(bundles[0].Chunks), 0) + assert.Greater(t, len(bundles[1].Chunks), 0) break } @@ -739,9 +739,9 @@ func TestGetBatch(t *testing.T) { }, ChunkLength: 8, }}) - for _, bundles := range encodedBlob2.BundlesByOperator { + for _, bundles := range encodedBlob2.EncodedBundlesByOperator { assert.Len(t, bundles, 1) - assert.Greater(t, len(bundles[core.QuorumID(2)]), 0) + assert.Greater(t, len(bundles[core.QuorumID(2)].Chunks), 0) break } assert.Len(t, batch.BlobHeaders, 2) @@ -842,7 +842,7 @@ func TestCreateMinibatch(t *testing.T) { // Check EncodedBlobs assert.Len(t, batch.EncodedBlobs, 2) - assert.Len(t, batch.EncodedBlobs[0].BundlesByOperator, numOperators) + assert.Len(t, batch.EncodedBlobs[0].EncodedBundlesByOperator, numOperators) var encodedBlob1 core.EncodedBlob var encodedBlob2 core.EncodedBlob @@ -886,10 +886,10 @@ func TestCreateMinibatch(t *testing.T) { }) assert.Contains(t, batch.BlobHeaders, encodedBlob1.BlobHeader) - for _, bundles := range encodedBlob1.BundlesByOperator { + for _, bundles := range encodedBlob1.EncodedBundlesByOperator { assert.Len(t, bundles, 2) - assert.Greater(t, len(bundles[0]), 0) - assert.Greater(t, len(bundles[1]), 0) + assert.Greater(t, len(bundles[0].Chunks), 0) + assert.Greater(t, len(bundles[1].Chunks), 0) break } @@ -907,9 +907,9 @@ func TestCreateMinibatch(t *testing.T) { }, ChunkLength: 8, }}) - for _, bundles := range encodedBlob2.BundlesByOperator { + for _, bundles := range encodedBlob2.EncodedBundlesByOperator { assert.Len(t, bundles, 1) - assert.Greater(t, len(bundles[core.QuorumID(2)]), 0) + assert.Greater(t, len(bundles[core.QuorumID(2)].Chunks), 0) break } assert.Len(t, batch.BlobHeaders, 2) diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 1ea3375444..6597492722 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -52,20 +52,20 @@ func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, update chan core.SigningMessage) { for id, op := range state.IndexedOperators { go func(op core.IndexedOperatorInfo, id core.OperatorID) { - blobMessages := make([]*core.BlobMessage, 0) + blobMessages := make([]*core.EncodedBlobMessage, 0) hasAnyBundles := false batchHeaderHash, err := batchHeader.GetBatchHeaderHash() if err != nil { return } for _, blob := range blobs { - if _, ok := blob.BundlesByOperator[id]; ok { + if _, ok := blob.EncodedBundlesByOperator[id]; ok { hasAnyBundles = true } - blobMessages = append(blobMessages, &core.BlobMessage{ + blobMessages = append(blobMessages, &core.EncodedBlobMessage{ BlobHeader: blob.BlobHeader, // Bundles will be empty if the operator is not in the quorums blob is dispersed on - Bundles: blob.BundlesByOperator[id], + EncodedBundles: blob.EncodedBundlesByOperator[id], }) } if !hasAnyBundles { @@ -111,7 +111,7 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera } } -func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { +func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { // TODO Add secure Grpc conn, err := grpc.Dial( @@ -152,7 +152,7 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, // SendBlobsToOperator sends blobs to an operator via the node's StoreBlobs endpoint // It returns the signatures of the blobs sent to the operator in the same order as the blobs // with nil values for blobs that were not attested by the operator -func (c *dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) { +func (c *dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) { // TODO Add secure Grpc conn, err := grpc.Dial( @@ -280,7 +280,7 @@ func (c *dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalCl return &core.Signature{G1Point: point}, nil } -func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreChunksRequest, int64, error) { +func GetStoreChunksRequest(blobMessages []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreChunksRequest, int64, error) { blobs := make([]*node.Blob, len(blobMessages)) totalSize := int64(0) for i, blob := range blobMessages { @@ -289,7 +289,7 @@ func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.B if err != nil { return nil, 0, err } - totalSize += int64(blob.Bundles.Size()) + totalSize += getBundlesSize(blob) } request := &node.StoreChunksRequest{ @@ -300,7 +300,7 @@ func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.B return request, totalSize, nil } -func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreBlobsRequest, int64, error) { +func GetStoreBlobsRequest(blobMessages []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreBlobsRequest, int64, error) { blobs := make([]*node.Blob, len(blobMessages)) totalSize := int64(0) for i, blob := range blobMessages { @@ -309,7 +309,7 @@ func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.Ba if err != nil { return nil, 0, err } - totalSize += int64(blob.Bundles.Size()) + totalSize += getBundlesSize(blob) } request := &node.StoreBlobsRequest{ @@ -320,7 +320,7 @@ func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.Ba return request, totalSize, nil } -func getBlobMessage(blob *core.BlobMessage, useGnarkBundleEncoding bool) (*node.Blob, error) { +func getBlobMessage(blob *core.EncodedBlobMessage, useGnarkBundleEncoding bool) (*node.Blob, error) { if blob.BlobHeader == nil { return nil, errors.New("blob header is nil") } @@ -357,13 +357,20 @@ func getBlobMessage(blob *core.BlobMessage, useGnarkBundleEncoding bool) (*node. } } + var err error bundles := make([]*node.Bundle, len(quorumHeaders)) if useGnarkBundleEncoding { // the ordering of quorums in bundles must be same as in quorumHeaders for i, quorumHeader := range quorumHeaders { quorum := quorumHeader.QuorumId - if bundle, ok := blob.Bundles[uint8(quorum)]; ok { - bundleBytes, err := bundle.Serialize() + if chunksData, ok := blob.EncodedBundles[uint8(quorum)]; ok { + if chunksData.Format != core.GnarkChunkEncodingFormat { + chunksData, err = chunksData.ToGnarkFormat() + if err != nil { + return nil, err + } + } + bundleBytes, err := chunksData.FlattenToBundle() if err != nil { return nil, err } @@ -378,16 +385,18 @@ func getBlobMessage(blob *core.BlobMessage, useGnarkBundleEncoding bool) (*node. } } } else { - data, err := blob.Bundles.Serialize() - if err != nil { - return nil, err - } // the ordering of quorums in bundles must be same as in quorumHeaders for i, quorumHeader := range quorumHeaders { quorum := quorumHeader.QuorumId - if _, ok := blob.Bundles[uint8(quorum)]; ok { + if chunksData, ok := blob.EncodedBundles[uint8(quorum)]; ok { + if chunksData.Format != core.GobChunkEncodingFormat { + chunksData, err = chunksData.ToGobFormat() + if err != nil { + return nil, err + } + } bundles[i] = &node.Bundle{ - Chunks: data[quorum], + Chunks: chunksData.Chunks, } } else { bundles[i] = &node.Bundle{ @@ -417,3 +426,11 @@ func getBatchHeaderMessage(header *core.BatchHeader) *node.BatchHeader { ReferenceBlockNumber: uint32(header.ReferenceBlockNumber), } } + +func getBundlesSize(blob *core.EncodedBlobMessage) int64 { + size := int64(0) + for _, bundle := range blob.EncodedBundles { + size += int64(bundle.Size()) + } + return size +} diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go index dfd13419d6..9ae7d1517c 100644 --- a/disperser/batcher/minibatcher.go +++ b/disperser/batcher/minibatcher.go @@ -314,16 +314,16 @@ func (b *Minibatcher) SendBlobsToOperatorWithRetries( opID core.OperatorID, maxNumRetries int, ) ([]*core.Signature, error) { - blobMessages := make([]*core.BlobMessage, 0) + blobMessages := make([]*core.EncodedBlobMessage, 0) hasAnyBundles := false for _, blob := range blobs { - if _, ok := blob.BundlesByOperator[opID]; ok { + if _, ok := blob.EncodedBundlesByOperator[opID]; ok { hasAnyBundles = true } - blobMessages = append(blobMessages, &core.BlobMessage{ + blobMessages = append(blobMessages, &core.EncodedBlobMessage{ BlobHeader: blob.BlobHeader, // Bundles will be empty if the operator is not in the quorums blob is dispersed on - Bundles: blob.BundlesByOperator[opID], + EncodedBundles: blob.EncodedBundlesByOperator[opID], }) } if !hasAnyBundles { diff --git a/disperser/disperser.go b/disperser/disperser.go index cae2980df3..f69f7bfc89 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -187,7 +187,7 @@ type BlobStore interface { type Dispatcher interface { DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage - SendBlobsToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) + SendBlobsToOperator(ctx context.Context, blobs []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) } diff --git a/disperser/encoder/client.go b/disperser/encoder/client.go index 6b3858a63b..8a72b08c85 100644 --- a/disperser/encoder/client.go +++ b/disperser/encoder/client.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/disperser" pb "github.com/Layr-Labs/eigenda/disperser/api/grpc/encoder" "github.com/Layr-Labs/eigenda/encoding" @@ -24,7 +25,7 @@ func NewEncoderClient(addr string, timeout time.Duration) (disperser.EncoderClie }, nil } -func (c client) EncodeBlob(ctx context.Context, data []byte, encodingParams encoding.EncodingParams) (*encoding.BlobCommitments, []*encoding.Frame, error) { +func (c client) EncodeBlob(ctx context.Context, data []byte, encodingParams encoding.EncodingParams) (*encoding.BlobCommitments, *core.ChunksData, error) { conn, err := grpc.Dial( c.addr, grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -59,18 +60,17 @@ func (c client) EncodeBlob(ctx context.Context, data []byte, encodingParams enco if err != nil { return nil, nil, err } - chunks := make([]*encoding.Frame, len(reply.GetChunks())) - for i, chunk := range reply.GetChunks() { - deserialized, err := new(encoding.Frame).Deserialize(chunk) - if err != nil { - return nil, nil, err - } - chunks[i] = deserialized + chunksData := &core.ChunksData{ + Chunks: reply.GetChunks(), + // TODO(jianoaix): plumb the encoding format for the encoder server. For now it's fine + // as it's hard coded using Gob at Encoder server. + Format: core.GobChunkEncodingFormat, + ChunkLen: int(encodingParams.ChunkLength), } return &encoding.BlobCommitments{ Commitment: commitment, LengthCommitment: lengthCommitment, LengthProof: lengthProof, Length: uint(reply.GetCommitment().GetLength()), - }, chunks, nil + }, chunksData, nil } diff --git a/disperser/encoder_client.go b/disperser/encoder_client.go index 20857af9cd..daffd35136 100644 --- a/disperser/encoder_client.go +++ b/disperser/encoder_client.go @@ -3,9 +3,10 @@ package disperser import ( "context" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" ) type EncoderClient interface { - EncodeBlob(ctx context.Context, data []byte, encodingParams encoding.EncodingParams) (*encoding.BlobCommitments, []*encoding.Frame, error) + EncodeBlob(ctx context.Context, data []byte, encodingParams encoding.EncodingParams) (*encoding.BlobCommitments, *core.ChunksData, error) } diff --git a/disperser/local_encoder_client.go b/disperser/local_encoder_client.go index b66cf79dfd..ed55efda0b 100644 --- a/disperser/local_encoder_client.go +++ b/disperser/local_encoder_client.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" ) @@ -21,7 +22,7 @@ func NewLocalEncoderClient(prover encoding.Prover) *LocalEncoderClient { } } -func (m *LocalEncoderClient) EncodeBlob(ctx context.Context, data []byte, encodingParams encoding.EncodingParams) (*encoding.BlobCommitments, []*encoding.Frame, error) { +func (m *LocalEncoderClient) EncodeBlob(ctx context.Context, data []byte, encodingParams encoding.EncodingParams) (*encoding.BlobCommitments, *core.ChunksData, error) { m.mu.Lock() defer m.mu.Unlock() commits, chunks, err := m.prover.EncodeAndProve(data, encodingParams) @@ -29,5 +30,19 @@ func (m *LocalEncoderClient) EncodeBlob(ctx context.Context, data []byte, encodi return nil, nil, err } - return &commits, chunks, nil + bytes := make([][]byte, 0, len(chunks)) + for _, c := range chunks { + serialized, err := c.Serialize() + if err != nil { + return nil, nil, err + } + bytes = append(bytes, serialized) + } + chunksData := &core.ChunksData{ + Chunks: bytes, + Format: core.GobChunkEncodingFormat, + ChunkLen: int(encodingParams.ChunkLength), + } + + return &commits, chunksData, nil } diff --git a/disperser/mock/dispatcher.go b/disperser/mock/dispatcher.go index 743de8ebd9..e17dbd0d15 100644 --- a/disperser/mock/dispatcher.go +++ b/disperser/mock/dispatcher.go @@ -66,7 +66,7 @@ func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera return update } -func (d *Dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) { +func (d *Dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) { args := d.Called(ctx, blobs, batchHeader, op) if args.Get(0) == nil { return nil, args.Error(1) diff --git a/disperser/mock/encoder.go b/disperser/mock/encoder.go index 0aa6422434..c9d4d1babb 100644 --- a/disperser/mock/encoder.go +++ b/disperser/mock/encoder.go @@ -3,6 +3,7 @@ package mock import ( "context" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/encoding" "github.com/stretchr/testify/mock" @@ -18,15 +19,15 @@ func NewMockEncoderClient() *MockEncoderClient { return &MockEncoderClient{} } -func (m *MockEncoderClient) EncodeBlob(ctx context.Context, data []byte, encodingParams encoding.EncodingParams) (*encoding.BlobCommitments, []*encoding.Frame, error) { +func (m *MockEncoderClient) EncodeBlob(ctx context.Context, data []byte, encodingParams encoding.EncodingParams) (*encoding.BlobCommitments, *core.ChunksData, error) { args := m.Called(ctx, data, encodingParams) var commitments *encoding.BlobCommitments if args.Get(0) != nil { commitments = args.Get(0).(*encoding.BlobCommitments) } - var chunks []*encoding.Frame + var chunks *core.ChunksData if args.Get(1) != nil { - chunks = args.Get(1).([]*encoding.Frame) + chunks = args.Get(1).(*core.ChunksData) } return commitments, chunks, args.Error(2) } diff --git a/node/grpc/server_load_test.go b/node/grpc/server_load_test.go index a0ab6bd72d..319da4a174 100644 --- a/node/grpc/server_load_test.go +++ b/node/grpc/server_load_test.go @@ -15,14 +15,14 @@ import ( "github.com/stretchr/testify/assert" ) -func makeBatch(t *testing.T, blobSize int, numBlobs int, advThreshold, quorumThreshold int, refBlockNumber uint) (*core.BatchHeader, map[core.OperatorID][]*core.BlobMessage) { +func makeBatch(t *testing.T, blobSize int, numBlobs int, advThreshold, quorumThreshold int, refBlockNumber uint) (*core.BatchHeader, map[core.OperatorID][]*core.EncodedBlobMessage) { p, _, err := makeTestComponents() assert.NoError(t, err) asn := &core.StdAssignmentCoordinator{} blobHeaders := make([]*core.BlobHeader, numBlobs) blobChunks := make([][]*encoding.Frame, numBlobs) - blobMessagesByOp := make(map[core.OperatorID][]*core.BlobMessage) + blobMessagesByOp := make(map[core.OperatorID][]*core.EncodedBlobMessage) for i := 0; i < numBlobs; i++ { // create data ranData := make([]byte, blobSize) @@ -67,6 +67,13 @@ func makeBatch(t *testing.T, blobSize int, numBlobs int, advThreshold, quorumThr assert.NoError(t, err) blobChunks[i] = chunks + chunkBytes := make([][]byte, len(chunks)) + for _, c := range chunks { + serialized, err := c.Serialize() + assert.NotNil(t, err) + chunkBytes = append(chunkBytes, serialized) + } + // populate blob header blobHeaders[i] = &core.BlobHeader{ BlobCommitments: commits, @@ -75,11 +82,13 @@ func makeBatch(t *testing.T, blobSize int, numBlobs int, advThreshold, quorumThr // populate blob messages for opID, assignment := range quorumInfo.Assignments { - blobMessagesByOp[opID] = append(blobMessagesByOp[opID], &core.BlobMessage{ - BlobHeader: blobHeaders[i], - Bundles: make(core.Bundles), + blobMessagesByOp[opID] = append(blobMessagesByOp[opID], &core.EncodedBlobMessage{ + BlobHeader: blobHeaders[i], + EncodedBundles: make(core.EncodedBundles), }) - blobMessagesByOp[opID][i].Bundles[0] = append(blobMessagesByOp[opID][i].Bundles[0], chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) + blobMessagesByOp[opID][i].EncodedBundles[0].Format = core.GobChunkEncodingFormat + blobMessagesByOp[opID][i].EncodedBundles[0].ChunkLen = int(params.ChunkLength) + blobMessagesByOp[opID][i].EncodedBundles[0].Chunks = append(blobMessagesByOp[opID][i].EncodedBundles[0].Chunks, chunkBytes[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) } } @@ -100,7 +109,7 @@ func TestStoreChunks(t *testing.T) { batchHeader, blobMessagesByOp := makeBatch(t, 200*1024, 50, 80, 100, 1) numTotalChunks := 0 for i := range blobMessagesByOp[opID] { - numTotalChunks += len(blobMessagesByOp[opID][i].Bundles[0]) + numTotalChunks += len(blobMessagesByOp[opID][i].EncodedBundles[0].Chunks) } t.Logf("Batch numTotalChunks: %d", numTotalChunks) req, totalSize, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader, false)