diff --git a/clients/mock/node_client.go b/clients/mock/node_client.go index b6eda3573f..27d4e8fdbb 100644 --- a/clients/mock/node_client.go +++ b/clients/mock/node_client.go @@ -57,6 +57,6 @@ func (c *MockNodeClient) GetChunks( chunksChan <- clients.RetrievedChunks{ OperatorID: opID, Err: nil, - Chunks: encodedBlob[opID].Bundles[quorumID], + Chunks: encodedBlob.BundlesByOperator[opID][quorumID], } } diff --git a/clients/tests/retrieval_client_test.go b/clients/tests/retrieval_client_test.go index 1c859fb233..7f5684f53a 100644 --- a/clients/tests/retrieval_client_test.go +++ b/clients/tests/retrieval_client_test.go @@ -50,15 +50,18 @@ func makeTestComponents() (encoding.Prover, encoding.Verifier, error) { } var ( - indexedChainState core.IndexedChainState - chainState core.ChainState - indexer *indexermock.MockIndexer - operatorState *core.OperatorState - nodeClient *clientsmock.MockNodeClient - coordinator *core.StdAssignmentCoordinator - retrievalClient clients.RetrievalClient - blobHeader *core.BlobHeader - encodedBlob core.EncodedBlob = make(core.EncodedBlob) + indexedChainState core.IndexedChainState + chainState core.ChainState + indexer *indexermock.MockIndexer + operatorState *core.OperatorState + nodeClient *clientsmock.MockNodeClient + coordinator *core.StdAssignmentCoordinator + retrievalClient clients.RetrievalClient + blobHeader *core.BlobHeader + encodedBlob core.EncodedBlob = core.EncodedBlob{ + BlobHeader: nil, + BundlesByOperator: make(map[core.OperatorID]core.Bundles), + } batchHeaderHash [32]byte batchRoot [32]byte gettysburgAddressBytes = []byte("Fourscore and seven years ago our fathers brought forth, on this continent, a new nation, conceived in liberty, and dedicated to the proposition that all men are created equal. Now we are engaged in a great civil war, testing whether that nation, or any nation so conceived, and so dedicated, can long endure. We are met on a great battle-field of that war. We have come to dedicate a portion of that field, as a final resting-place for those who here gave their lives, that that nation might live. It is altogether fitting and proper that we should do this. But, in a larger sense, we cannot dedicate, we cannot consecrate—we cannot hallow—this ground. The brave men, living and dead, who struggled here, have consecrated it far above our poor power to add or detract. The world will little note, nor long remember what we say here, but it can never forget what they did here. It is for us the living, rather, to be dedicated here to the unfinished work which they who fought here have thus far so nobly advanced. It is rather for us to be here dedicated to the great task remaining before us—that from these honored dead we take increased devotion to that cause for which they here gave the last full measure of devotion—that we here highly resolve that these dead shall not have died in vain—that this nation, under God, shall have a new birth of freedom, and that government of the people, by the people, for the people, shall not perish from the earth.") @@ -189,10 +192,8 @@ func setup(t *testing.T) { for id, assignment := range assignments { bundles := make(map[core.QuorumID]core.Bundle, len(blobHeader.QuorumInfos)) bundles[quorumID] = chunks[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks] - encodedBlob[id] = &core.BlobMessage{ - BlobHeader: blobHeader, - Bundles: bundles, - } + encodedBlob.BlobHeader = blobHeader + encodedBlob.BundlesByOperator[id] = bundles } } diff --git a/core/data.go b/core/data.go index 9185edda5b..46fbfedd5a 100644 --- a/core/data.go +++ b/core/data.go @@ -138,7 +138,10 @@ type BatchHeader struct { } // EncodedBlob contains the messages to be sent to a group of DA nodes corresponding to a single blob -type EncodedBlob = map[OperatorID]*BlobMessage +type EncodedBlob struct { + BlobHeader *BlobHeader + BundlesByOperator map[OperatorID]Bundles +} // A Bundle is the collection of chunks associated with a single blob, for a single operator and a single quorum. type Bundle = []*encoding.Frame diff --git a/core/test/core_test.go b/core/test/core_test.go index 80b4e1ff4a..6404a38e78 100644 --- a/core/test/core_test.go +++ b/core/test/core_test.go @@ -141,15 +141,15 @@ func prepareBatch(t *testing.T, cst core.IndexedChainState, blobs []core.Blob, q QuorumInfos: []*core.BlobQuorumInfo{quorumHeader}, } - var encodedBlob core.EncodedBlob = make(map[core.OperatorID]*core.BlobMessage, len(assignments)) + encodedBlob := core.EncodedBlob{ + BundlesByOperator: make(map[core.OperatorID]core.Bundles), + } for id, assignment := range assignments { bundles := map[core.QuorumID]core.Bundle{ quorumID: chunks[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks], } - encodedBlob[id] = &core.BlobMessage{ - BlobHeader: blobHeader, - Bundles: bundles, - } + encodedBlob.BlobHeader = blobHeader + encodedBlob.BundlesByOperator[id] = bundles } encodedBlobs[z] = encodedBlob @@ -169,8 +169,10 @@ func checkBatch(t *testing.T, cst core.IndexedChainState, encodedBlob core.Encod for id := range state.IndexedOperators { val.UpdateOperatorID(id) - blobMessage := encodedBlob[id] - err := val.ValidateBlob(blobMessage, state.OperatorState) + err := val.ValidateBlob(&core.BlobMessage{ + BlobHeader: encodedBlob.BlobHeader, + Bundles: encodedBlob.BundlesByOperator[id], + }, state.OperatorState) assert.NoError(t, err) } @@ -189,7 +191,10 @@ func checkBatchByUniversalVerifier(t *testing.T, cst core.IndexedChainState, enc val.UpdateOperatorID(id) var blobMessages []*core.BlobMessage = make([]*core.BlobMessage, numBlob) for z, encodedBlob := range encodedBlobs { - blobMessages[z] = encodedBlob[id] + blobMessages[z] = &core.BlobMessage{ + BlobHeader: encodedBlob.BlobHeader, + Bundles: encodedBlob.BundlesByOperator[id], + } } err := val.ValidateBatch(blobMessages, state.OperatorState, pool) assert.NoError(t, err) diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 0f71187e55..df68265593 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -462,24 +462,24 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) { if _, ok := encodedBlobByKey[blobKey]; !ok { metadataByKey[blobKey] = result.BlobMetadata blobQuorums[blobKey] = make([]*core.BlobQuorumInfo, 0) - encodedBlobByKey[blobKey] = make(core.EncodedBlob) + blobHeader := &core.BlobHeader{ + BlobCommitments: *result.Commitment, + } + blobHeaderByKey[blobKey] = blobHeader + encodedBlobByKey[blobKey] = core.EncodedBlob{ + BlobHeader: blobHeader, + BundlesByOperator: make(map[core.OperatorID]core.Bundles), + } } // Populate the assigned bundles for opID, assignment := range result.Assignments { - blobMessage, ok := encodedBlobByKey[blobKey][opID] + bundles, ok := encodedBlobByKey[blobKey].BundlesByOperator[opID] if !ok { - blobHeader := &core.BlobHeader{ - BlobCommitments: *result.Commitment, - } - blobHeaderByKey[blobKey] = blobHeader - blobMessage = &core.BlobMessage{ - BlobHeader: blobHeader, - Bundles: make(core.Bundles), - } - encodedBlobByKey[blobKey][opID] = blobMessage + encodedBlobByKey[blobKey].BundlesByOperator[opID] = make(core.Bundles) + bundles = encodedBlobByKey[blobKey].BundlesByOperator[opID] } - blobMessage.Bundles[result.BlobQuorumInfo.QuorumID] = append(blobMessage.Bundles[result.BlobQuorumInfo.QuorumID], result.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) + bundles[result.BlobQuorumInfo.QuorumID] = append(bundles[result.BlobQuorumInfo.QuorumID], result.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) } blobQuorums[blobKey] = append(blobQuorums[blobKey], result.BlobQuorumInfo) @@ -487,9 +487,7 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) { // Populate the blob quorum infos for blobKey, encodedBlob := range encodedBlobByKey { - for _, blobMessage := range encodedBlob { - blobMessage.BlobHeader.QuorumInfos = blobQuorums[blobKey] - } + encodedBlob.BlobHeader.QuorumInfos = blobQuorums[blobKey] } for blobKey, metadata := range metadataByKey { diff --git a/disperser/batcher/encoding_streamer_test.go b/disperser/batcher/encoding_streamer_test.go index a22892356c..4dc710e069 100644 --- a/disperser/batcher/encoding_streamer_test.go +++ b/disperser/batcher/encoding_streamer_test.go @@ -431,31 +431,30 @@ func TestPartialBlob(t *testing.T) { // Check EncodedBlobs assert.Len(t, batch.EncodedBlobs, 1) - assert.Len(t, batch.EncodedBlobs[0], numOperators) + assert.Len(t, batch.EncodedBlobs[0].BundlesByOperator, numOperators) encodedBlob1 := batch.EncodedBlobs[0] assert.NotNil(t, encodedBlob1) + assert.NotNil(t, encodedBlob1.BlobHeader) + assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments) + assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments.Commitment) + assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments.LengthProof) + assert.Equal(t, encodedBlob1.BlobHeader.BlobCommitments.Length, uint(48)) + assert.Len(t, encodedBlob1.BlobHeader.QuorumInfos, 1) + assert.ElementsMatch(t, encodedBlob1.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{{ + SecurityParam: core.SecurityParam{ + QuorumID: 0, + AdversaryThreshold: 75, + QuorumThreshold: 100, + }, + ChunkLength: 8, + }}) - for _, blobMessage := range encodedBlob1 { - assert.NotNil(t, blobMessage) - assert.NotNil(t, blobMessage.BlobHeader) - assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments) - assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments.Commitment) - assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments.LengthProof) - assert.Equal(t, blobMessage.BlobHeader.BlobCommitments.Length, uint(48)) - assert.Len(t, blobMessage.BlobHeader.QuorumInfos, 1) - assert.ElementsMatch(t, blobMessage.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{{ - SecurityParam: core.SecurityParam{ - QuorumID: 0, - AdversaryThreshold: 75, - QuorumThreshold: 100, - }, - ChunkLength: 8, - }}) - - assert.Contains(t, batch.BlobHeaders, blobMessage.BlobHeader) - assert.Len(t, blobMessage.Bundles, 1) - assert.Greater(t, len(blobMessage.Bundles[0]), 0) + assert.Contains(t, batch.BlobHeaders, encodedBlob1.BlobHeader) + assert.Len(t, encodedBlob1.BundlesByOperator, numOperators) + for _, bundles := range encodedBlob1.BundlesByOperator { + assert.Len(t, bundles, 1) + assert.Greater(t, len(bundles[0]), 0) break } @@ -646,7 +645,7 @@ func TestGetBatch(t *testing.T) { // Check EncodedBlobs assert.Len(t, batch.EncodedBlobs, 2) - assert.Len(t, batch.EncodedBlobs[0], numOperators) + assert.Len(t, batch.EncodedBlobs[0].BundlesByOperator, numOperators) var encodedBlob1 core.EncodedBlob var encodedBlob2 core.EncodedBlob @@ -663,59 +662,57 @@ func TestGetBatch(t *testing.T) { } assert.NotNil(t, encodedBlob1) assert.NotNil(t, encodedBlob2) - for _, blobMessage := range encodedBlob1 { - assert.NotNil(t, blobMessage) - assert.NotNil(t, blobMessage.BlobHeader) - assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments) - assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments.Commitment) - assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments.LengthProof) - assert.Equal(t, blobMessage.BlobHeader.BlobCommitments.Length, uint(48)) - assert.Len(t, blobMessage.BlobHeader.QuorumInfos, 2) - assert.ElementsMatch(t, blobMessage.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{ - { - SecurityParam: core.SecurityParam{ - QuorumID: 0, - AdversaryThreshold: 80, - QuorumThreshold: 100, - }, - ChunkLength: 16, - }, - { - SecurityParam: core.SecurityParam{ - QuorumID: 1, - AdversaryThreshold: 70, - QuorumThreshold: 95, - }, - ChunkLength: 8, - }, - }) - assert.Contains(t, batch.BlobHeaders, blobMessage.BlobHeader) - assert.Len(t, blobMessage.Bundles, 2) - assert.Greater(t, len(blobMessage.Bundles[0]), 0) - assert.Greater(t, len(blobMessage.Bundles[1]), 0) - break - } - - for _, blobMessage := range encodedBlob2 { - assert.NotNil(t, blobMessage) - assert.NotNil(t, blobMessage.BlobHeader) - assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments) - assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments.Commitment) - assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments.LengthProof) - assert.Equal(t, blobMessage.BlobHeader.BlobCommitments.Length, uint(48)) - assert.Len(t, blobMessage.BlobHeader.QuorumInfos, 1) - assert.ElementsMatch(t, blobMessage.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{{ + assert.NotNil(t, encodedBlob1.BlobHeader) + assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments) + assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments.Commitment) + assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments.LengthProof) + assert.Equal(t, encodedBlob1.BlobHeader.BlobCommitments.Length, uint(48)) + assert.Len(t, encodedBlob1.BlobHeader.QuorumInfos, 2) + assert.ElementsMatch(t, encodedBlob1.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{ + { SecurityParam: core.SecurityParam{ - QuorumID: 2, - AdversaryThreshold: 75, + QuorumID: 0, + AdversaryThreshold: 80, QuorumThreshold: 100, }, + ChunkLength: 16, + }, + { + SecurityParam: core.SecurityParam{ + QuorumID: 1, + AdversaryThreshold: 70, + QuorumThreshold: 95, + }, ChunkLength: 8, - }}) + }, + }) - assert.Len(t, blobMessage.Bundles, 1) - assert.Greater(t, len(blobMessage.Bundles[core.QuorumID(2)]), 0) + assert.Contains(t, batch.BlobHeaders, encodedBlob1.BlobHeader) + for _, bundles := range encodedBlob1.BundlesByOperator { + assert.Len(t, bundles, 2) + assert.Greater(t, len(bundles[0]), 0) + assert.Greater(t, len(bundles[1]), 0) + break + } + + assert.NotNil(t, encodedBlob2.BlobHeader) + assert.NotNil(t, encodedBlob2.BlobHeader.BlobCommitments) + assert.NotNil(t, encodedBlob2.BlobHeader.BlobCommitments.Commitment) + assert.NotNil(t, encodedBlob2.BlobHeader.BlobCommitments.LengthProof) + assert.Equal(t, encodedBlob2.BlobHeader.BlobCommitments.Length, uint(48)) + assert.Len(t, encodedBlob2.BlobHeader.QuorumInfos, 1) + assert.ElementsMatch(t, encodedBlob2.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{{ + SecurityParam: core.SecurityParam{ + QuorumID: 2, + AdversaryThreshold: 75, + QuorumThreshold: 100, + }, + ChunkLength: 8, + }}) + for _, bundles := range encodedBlob2.BundlesByOperator { + assert.Len(t, bundles, 1) + assert.Greater(t, len(bundles[core.QuorumID(2)]), 0) break } assert.Len(t, batch.BlobHeaders, 2) diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 0b7ff97a46..71d7fac940 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -34,23 +34,27 @@ func NewDispatcher(cfg *Config, logger common.Logger) *dispatcher { var _ disperser.Dispatcher = (*dispatcher)(nil) -func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader) chan core.SignerMessage { +func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader) chan core.SignerMessage { update := make(chan core.SignerMessage, len(state.IndexedOperators)) // Disperse - c.sendAllChunks(ctx, state, blobs, header, update) + c.sendAllChunks(ctx, state, blobs, batchHeader, update) return update } -func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader, update chan core.SignerMessage) { +func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, update chan core.SignerMessage) { for id, op := range state.IndexedOperators { go func(op core.IndexedOperatorInfo, id core.OperatorID) { - blobMessages := make([]*core.BlobMessage, len(blobs)) - for i, blob := range blobs { - blobMessages[i] = blob[id] + blobMessages := make([]*core.BlobMessage, 0) + for _, blob := range blobs { + blobMessages = append(blobMessages, &core.BlobMessage{ + BlobHeader: blob.BlobHeader, + // Bundles will be empty if the operator is not in the quorums blob is dispersed on + Bundles: blob.BundlesByOperator[id], + }) } - sig, err := c.sendChunks(ctx, blobMessages, header, &op) + sig, err := c.sendChunks(ctx, blobMessages, batchHeader, &op) if err != nil { update <- core.SignerMessage{ Err: err, @@ -73,7 +77,7 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera } } -func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, header *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { +func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { // TODO Add secure Grpc conn, err := grpc.Dial( @@ -89,8 +93,7 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, gc := node.NewDispersalClient(conn) ctx, cancel := context.WithTimeout(ctx, c.Timeout) defer cancel() - - request, totalSize, err := GetStoreChunksRequest(blobs, header) + request, totalSize, err := GetStoreChunksRequest(blobs, batchHeader) if err != nil { return nil, err } @@ -108,7 +111,7 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, return sig, nil } -func GetStoreChunksRequest(blobMessages []*core.BlobMessage, header *core.BatchHeader) (*node.StoreChunksRequest, int64, error) { +func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader) (*node.StoreChunksRequest, int64, error) { blobs := make([]*node.Blob, len(blobMessages)) totalSize := int64(0) for i, blob := range blobMessages { @@ -121,7 +124,7 @@ func GetStoreChunksRequest(blobMessages []*core.BlobMessage, header *core.BatchH } request := &node.StoreChunksRequest{ - BatchHeader: getBatchHeaderMessage(header), + BatchHeader: getBatchHeaderMessage(batchHeader), Blobs: blobs, } @@ -169,12 +172,19 @@ func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) { if err != nil { return nil, err } - bundles := make([]*node.Bundle, len(blob.Bundles)) + bundles := make([]*node.Bundle, len(quorumHeaders)) // the ordering of quorums in bundles must be same as in quorumHeaders for i, quorumHeader := range quorumHeaders { quorum := quorumHeader.QuorumId - bundles[i] = &node.Bundle{ - Chunks: data[quorum], + if _, ok := blob.Bundles[uint8(quorum)]; ok { + bundles[i] = &node.Bundle{ + Chunks: data[quorum], + } + } else { + bundles[i] = &node.Bundle{ + // empty bundle for quorums operators are not part of + Chunks: make([][]byte, 0), + } } } diff --git a/node/grpc/server_test.go b/node/grpc/server_test.go index 6fabce4dbb..492f617299 100644 --- a/node/grpc/server_test.go +++ b/node/grpc/server_test.go @@ -177,6 +177,15 @@ func makeStoreChunksRequest(t *testing.T, quorumThreshold, adversaryThreshold ui ChunkLength: 10, } + quorumHeader1 := &core.BlobQuorumInfo{ + SecurityParam: core.SecurityParam{ + QuorumID: 1, + QuorumThreshold: 65, + AdversaryThreshold: 15, + }, + ChunkLength: 10, + } + blobHeaders := []*core.BlobHeader{ { BlobCommitments: encoding.BlobCommitments{ @@ -185,7 +194,7 @@ func makeStoreChunksRequest(t *testing.T, quorumThreshold, adversaryThreshold ui LengthProof: &lengthProof, Length: 48, }, - QuorumInfos: []*core.BlobQuorumInfo{quorumHeader}, + QuorumInfos: []*core.BlobQuorumInfo{quorumHeader, quorumHeader1}, }, { BlobCommitments: encoding.BlobCommitments{ @@ -208,8 +217,8 @@ func makeStoreChunksRequest(t *testing.T, quorumThreshold, adversaryThreshold ui batchHeaderHash, err := batchHeader.GetBatchHeaderHash() assert.NoError(t, err) - blobHeaderProto0 := blobHeaderToProto(t, blobHeaders[0]) - blobHeaderProto1 := blobHeaderToProto(t, blobHeaders[1]) + blobHeaderProto0 := blobHeaderToProto(blobHeaders[0]) + blobHeaderProto1 := blobHeaderToProto(blobHeaders[1]) req := &pb.StoreChunksRequest{ BatchHeader: &pb.BatchHeader{ @@ -223,6 +232,10 @@ func makeStoreChunksRequest(t *testing.T, quorumThreshold, adversaryThreshold ui { Chunks: [][]byte{encodedChunk}, }, + { + // Empty bundle for the second quorum + Chunks: [][]byte{}, + }, }, }, { @@ -275,6 +288,14 @@ func TestRetrieveChunks(t *testing.T) { chunk, err := new(encoding.Frame).Deserialize(encodedChunk) assert.NoError(t, err) assert.Equal(t, recovered, chunk) + + retrievalReply, err = server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{ + BatchHeaderHash: batchHeaderHash[:], + BlobIndex: 0, + QuorumId: 1, + }) + assert.NoError(t, err) + assert.Empty(t, retrievalReply.GetChunks()) } // If a batch fails to validate, it should not be stored in the store. @@ -323,9 +344,33 @@ func TestGetBlobHeader(t *testing.T) { ok, err := merkletree.VerifyProofUsing(blobHeaderHash[:], false, proof, [][]byte{batchRoot[:]}, keccak256.New()) assert.NoError(t, err) assert.True(t, ok) + + // Get blob header for the second quorum + reply, err = server.GetBlobHeader(context.Background(), &pb.GetBlobHeaderRequest{ + BatchHeaderHash: batchHeaderHash[:], + BlobIndex: 0, + QuorumId: 1, + }) + assert.NoError(t, err) + + actual = reply.GetBlobHeader() + expected = protoBlobHeaders[0] + assert.True(t, proto.Equal(expected, actual)) + + blobHeaderHash, err = blobHeaders[0].GetBlobHeaderHash() + assert.NoError(t, err) + + proof = &merkletree.Proof{ + Hashes: reply.GetProof().GetHashes(), + Index: uint64(reply.GetProof().GetIndex()), + } + + ok, err = merkletree.VerifyProofUsing(blobHeaderHash[:], false, proof, [][]byte{batchRoot[:]}, keccak256.New()) + assert.NoError(t, err) + assert.True(t, ok) } -func blobHeaderToProto(t *testing.T, blobHeader *core.BlobHeader) *pb.BlobHeader { +func blobHeaderToProto(blobHeader *core.BlobHeader) *pb.BlobHeader { var lengthCommitment, lengthProof pb.G2Commitment if blobHeader.LengthCommitment != nil { lengthCommitment.XA0 = blobHeader.LengthCommitment.X.A0.Marshal() @@ -339,11 +384,14 @@ func blobHeaderToProto(t *testing.T, blobHeader *core.BlobHeader) *pb.BlobHeader lengthProof.YA0 = blobHeader.LengthProof.Y.A0.Marshal() lengthProof.YA1 = blobHeader.LengthProof.Y.A1.Marshal() } - quorumHeader := &pb.BlobQuorumInfo{ - QuorumId: uint32(blobHeader.QuorumInfos[0].QuorumID), - QuorumThreshold: uint32(blobHeader.QuorumInfos[0].QuorumThreshold), - AdversaryThreshold: uint32(blobHeader.QuorumInfos[0].AdversaryThreshold), - ChunkLength: uint32(blobHeader.QuorumInfos[0].ChunkLength), + quorumHeaders := make([]*pb.BlobQuorumInfo, len(blobHeader.QuorumInfos)) + for i, quorumInfo := range blobHeader.QuorumInfos { + quorumHeaders[i] = &pb.BlobQuorumInfo{ + QuorumId: uint32(quorumInfo.QuorumID), + QuorumThreshold: uint32(quorumInfo.QuorumThreshold), + AdversaryThreshold: uint32(quorumInfo.AdversaryThreshold), + ChunkLength: uint32(quorumInfo.ChunkLength), + } } return &pb.BlobHeader{ @@ -354,6 +402,6 @@ func blobHeaderToProto(t *testing.T, blobHeader *core.BlobHeader) *pb.BlobHeader LengthCommitment: &lengthCommitment, LengthProof: &lengthProof, Length: uint32(blobHeader.Length), - QuorumHeaders: []*pb.BlobQuorumInfo{quorumHeader}, + QuorumHeaders: quorumHeaders, } } diff --git a/node/grpc/utils.go b/node/grpc/utils.go index 5db612e29a..c4a89eadf3 100644 --- a/node/grpc/utils.go +++ b/node/grpc/utils.go @@ -3,6 +3,7 @@ package grpc import ( "context" "errors" + "fmt" "reflect" pb "github.com/Layr-Labs/eigenda/api/grpc/node" @@ -30,11 +31,13 @@ func GetBatchHeader(in *pb.StoreChunksRequest) (*core.BatchHeader, error) { func GetBlobMessages(in *pb.StoreChunksRequest) ([]*core.BlobMessage, error) { blobs := make([]*core.BlobMessage, len(in.GetBlobs())) for i, blob := range in.GetBlobs() { - blobHeader, err := GetBlobHeaderFromProto(blob.GetHeader()) if err != nil { return nil, err } + if len(blob.GetBundles()) != len(blob.GetHeader().GetQuorumHeaders()) { + return nil, fmt.Errorf("number of quorum headers (%d) does not match number of bundles in blob message (%d)", len(blob.GetHeader().GetQuorumHeaders()), len(blob.GetBundles())) + } bundles := make(map[core.QuorumID]core.Bundle, len(blob.GetBundles())) for i, chunks := range blob.GetBundles() {