Skip to content

Commit

Permalink
Fix: Send only blobs relevant to operators (Layr-Labs#301)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Mar 4, 2024
1 parent 259a5e0 commit 780e1be
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 133 deletions.
2 changes: 1 addition & 1 deletion clients/mock/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ func (c *MockNodeClient) GetChunks(
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: nil,
Chunks: encodedBlob[opID].Bundles[quorumID],
Chunks: encodedBlob.BundlesByOperator[opID][quorumID],
}
}
27 changes: 14 additions & 13 deletions clients/tests/retrieval_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,18 @@ func makeTestComponents() (encoding.Prover, encoding.Verifier, error) {
}

var (
indexedChainState core.IndexedChainState
chainState core.ChainState
indexer *indexermock.MockIndexer
operatorState *core.OperatorState
nodeClient *clientsmock.MockNodeClient
coordinator *core.StdAssignmentCoordinator
retrievalClient clients.RetrievalClient
blobHeader *core.BlobHeader
encodedBlob core.EncodedBlob = make(core.EncodedBlob)
indexedChainState core.IndexedChainState
chainState core.ChainState
indexer *indexermock.MockIndexer
operatorState *core.OperatorState
nodeClient *clientsmock.MockNodeClient
coordinator *core.StdAssignmentCoordinator
retrievalClient clients.RetrievalClient
blobHeader *core.BlobHeader
encodedBlob core.EncodedBlob = core.EncodedBlob{
BlobHeader: nil,
BundlesByOperator: make(map[core.OperatorID]core.Bundles),
}
batchHeaderHash [32]byte
batchRoot [32]byte
gettysburgAddressBytes = []byte("Fourscore and seven years ago our fathers brought forth, on this continent, a new nation, conceived in liberty, and dedicated to the proposition that all men are created equal. Now we are engaged in a great civil war, testing whether that nation, or any nation so conceived, and so dedicated, can long endure. We are met on a great battle-field of that war. We have come to dedicate a portion of that field, as a final resting-place for those who here gave their lives, that that nation might live. It is altogether fitting and proper that we should do this. But, in a larger sense, we cannot dedicate, we cannot consecrate—we cannot hallow—this ground. The brave men, living and dead, who struggled here, have consecrated it far above our poor power to add or detract. The world will little note, nor long remember what we say here, but it can never forget what they did here. It is for us the living, rather, to be dedicated here to the unfinished work which they who fought here have thus far so nobly advanced. It is rather for us to be here dedicated to the great task remaining before us—that from these honored dead we take increased devotion to that cause for which they here gave the last full measure of devotion—that we here highly resolve that these dead shall not have died in vain—that this nation, under God, shall have a new birth of freedom, and that government of the people, by the people, for the people, shall not perish from the earth.")
Expand Down Expand Up @@ -189,10 +192,8 @@ func setup(t *testing.T) {
for id, assignment := range assignments {
bundles := make(map[core.QuorumID]core.Bundle, len(blobHeader.QuorumInfos))
bundles[quorumID] = chunks[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks]
encodedBlob[id] = &core.BlobMessage{
BlobHeader: blobHeader,
Bundles: bundles,
}
encodedBlob.BlobHeader = blobHeader
encodedBlob.BundlesByOperator[id] = bundles
}

}
Expand Down
5 changes: 4 additions & 1 deletion core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ type BatchHeader struct {
}

// EncodedBlob contains the messages to be sent to a group of DA nodes corresponding to a single blob
type EncodedBlob = map[OperatorID]*BlobMessage
type EncodedBlob struct {
BlobHeader *BlobHeader
BundlesByOperator map[OperatorID]Bundles
}

// A Bundle is the collection of chunks associated with a single blob, for a single operator and a single quorum.
type Bundle = []*encoding.Frame
Expand Down
21 changes: 13 additions & 8 deletions core/test/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ func prepareBatch(t *testing.T, cst core.IndexedChainState, blobs []core.Blob, q
QuorumInfos: []*core.BlobQuorumInfo{quorumHeader},
}

var encodedBlob core.EncodedBlob = make(map[core.OperatorID]*core.BlobMessage, len(assignments))
encodedBlob := core.EncodedBlob{
BundlesByOperator: make(map[core.OperatorID]core.Bundles),
}
for id, assignment := range assignments {
bundles := map[core.QuorumID]core.Bundle{
quorumID: chunks[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks],
}
encodedBlob[id] = &core.BlobMessage{
BlobHeader: blobHeader,
Bundles: bundles,
}
encodedBlob.BlobHeader = blobHeader
encodedBlob.BundlesByOperator[id] = bundles
}
encodedBlobs[z] = encodedBlob

Expand All @@ -169,8 +169,10 @@ func checkBatch(t *testing.T, cst core.IndexedChainState, encodedBlob core.Encod

for id := range state.IndexedOperators {
val.UpdateOperatorID(id)
blobMessage := encodedBlob[id]
err := val.ValidateBlob(blobMessage, state.OperatorState)
err := val.ValidateBlob(&core.BlobMessage{
BlobHeader: encodedBlob.BlobHeader,
Bundles: encodedBlob.BundlesByOperator[id],
}, state.OperatorState)
assert.NoError(t, err)
}

Expand All @@ -189,7 +191,10 @@ func checkBatchByUniversalVerifier(t *testing.T, cst core.IndexedChainState, enc
val.UpdateOperatorID(id)
var blobMessages []*core.BlobMessage = make([]*core.BlobMessage, numBlob)
for z, encodedBlob := range encodedBlobs {
blobMessages[z] = encodedBlob[id]
blobMessages[z] = &core.BlobMessage{
BlobHeader: encodedBlob.BlobHeader,
Bundles: encodedBlob.BundlesByOperator[id],
}
}
err := val.ValidateBatch(blobMessages, state.OperatorState, pool)
assert.NoError(t, err)
Expand Down
28 changes: 13 additions & 15 deletions disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,34 +462,32 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) {
if _, ok := encodedBlobByKey[blobKey]; !ok {
metadataByKey[blobKey] = result.BlobMetadata
blobQuorums[blobKey] = make([]*core.BlobQuorumInfo, 0)
encodedBlobByKey[blobKey] = make(core.EncodedBlob)
blobHeader := &core.BlobHeader{
BlobCommitments: *result.Commitment,
}
blobHeaderByKey[blobKey] = blobHeader
encodedBlobByKey[blobKey] = core.EncodedBlob{
BlobHeader: blobHeader,
BundlesByOperator: make(map[core.OperatorID]core.Bundles),
}
}

// Populate the assigned bundles
for opID, assignment := range result.Assignments {
blobMessage, ok := encodedBlobByKey[blobKey][opID]
bundles, ok := encodedBlobByKey[blobKey].BundlesByOperator[opID]
if !ok {
blobHeader := &core.BlobHeader{
BlobCommitments: *result.Commitment,
}
blobHeaderByKey[blobKey] = blobHeader
blobMessage = &core.BlobMessage{
BlobHeader: blobHeader,
Bundles: make(core.Bundles),
}
encodedBlobByKey[blobKey][opID] = blobMessage
encodedBlobByKey[blobKey].BundlesByOperator[opID] = make(core.Bundles)
bundles = encodedBlobByKey[blobKey].BundlesByOperator[opID]
}
blobMessage.Bundles[result.BlobQuorumInfo.QuorumID] = append(blobMessage.Bundles[result.BlobQuorumInfo.QuorumID], result.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...)
bundles[result.BlobQuorumInfo.QuorumID] = append(bundles[result.BlobQuorumInfo.QuorumID], result.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...)
}

blobQuorums[blobKey] = append(blobQuorums[blobKey], result.BlobQuorumInfo)
}

// Populate the blob quorum infos
for blobKey, encodedBlob := range encodedBlobByKey {
for _, blobMessage := range encodedBlob {
blobMessage.BlobHeader.QuorumInfos = blobQuorums[blobKey]
}
encodedBlob.BlobHeader.QuorumInfos = blobQuorums[blobKey]
}

for blobKey, metadata := range metadataByKey {
Expand Down
135 changes: 66 additions & 69 deletions disperser/batcher/encoding_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,31 +431,30 @@ func TestPartialBlob(t *testing.T) {

// Check EncodedBlobs
assert.Len(t, batch.EncodedBlobs, 1)
assert.Len(t, batch.EncodedBlobs[0], numOperators)
assert.Len(t, batch.EncodedBlobs[0].BundlesByOperator, numOperators)

encodedBlob1 := batch.EncodedBlobs[0]
assert.NotNil(t, encodedBlob1)
assert.NotNil(t, encodedBlob1.BlobHeader)
assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments)
assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments.Commitment)
assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments.LengthProof)
assert.Equal(t, encodedBlob1.BlobHeader.BlobCommitments.Length, uint(48))
assert.Len(t, encodedBlob1.BlobHeader.QuorumInfos, 1)
assert.ElementsMatch(t, encodedBlob1.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{{
SecurityParam: core.SecurityParam{
QuorumID: 0,
AdversaryThreshold: 75,
QuorumThreshold: 100,
},
ChunkLength: 8,
}})

for _, blobMessage := range encodedBlob1 {
assert.NotNil(t, blobMessage)
assert.NotNil(t, blobMessage.BlobHeader)
assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments)
assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments.Commitment)
assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments.LengthProof)
assert.Equal(t, blobMessage.BlobHeader.BlobCommitments.Length, uint(48))
assert.Len(t, blobMessage.BlobHeader.QuorumInfos, 1)
assert.ElementsMatch(t, blobMessage.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{{
SecurityParam: core.SecurityParam{
QuorumID: 0,
AdversaryThreshold: 75,
QuorumThreshold: 100,
},
ChunkLength: 8,
}})

assert.Contains(t, batch.BlobHeaders, blobMessage.BlobHeader)
assert.Len(t, blobMessage.Bundles, 1)
assert.Greater(t, len(blobMessage.Bundles[0]), 0)
assert.Contains(t, batch.BlobHeaders, encodedBlob1.BlobHeader)
assert.Len(t, encodedBlob1.BundlesByOperator, numOperators)
for _, bundles := range encodedBlob1.BundlesByOperator {
assert.Len(t, bundles, 1)
assert.Greater(t, len(bundles[0]), 0)
break
}

Expand Down Expand Up @@ -646,7 +645,7 @@ func TestGetBatch(t *testing.T) {

// Check EncodedBlobs
assert.Len(t, batch.EncodedBlobs, 2)
assert.Len(t, batch.EncodedBlobs[0], numOperators)
assert.Len(t, batch.EncodedBlobs[0].BundlesByOperator, numOperators)

var encodedBlob1 core.EncodedBlob
var encodedBlob2 core.EncodedBlob
Expand All @@ -663,59 +662,57 @@ func TestGetBatch(t *testing.T) {
}
assert.NotNil(t, encodedBlob1)
assert.NotNil(t, encodedBlob2)
for _, blobMessage := range encodedBlob1 {
assert.NotNil(t, blobMessage)
assert.NotNil(t, blobMessage.BlobHeader)
assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments)
assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments.Commitment)
assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments.LengthProof)
assert.Equal(t, blobMessage.BlobHeader.BlobCommitments.Length, uint(48))
assert.Len(t, blobMessage.BlobHeader.QuorumInfos, 2)
assert.ElementsMatch(t, blobMessage.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{
{
SecurityParam: core.SecurityParam{
QuorumID: 0,
AdversaryThreshold: 80,
QuorumThreshold: 100,
},
ChunkLength: 16,
},
{
SecurityParam: core.SecurityParam{
QuorumID: 1,
AdversaryThreshold: 70,
QuorumThreshold: 95,
},
ChunkLength: 8,
},
})

assert.Contains(t, batch.BlobHeaders, blobMessage.BlobHeader)
assert.Len(t, blobMessage.Bundles, 2)
assert.Greater(t, len(blobMessage.Bundles[0]), 0)
assert.Greater(t, len(blobMessage.Bundles[1]), 0)
break
}

for _, blobMessage := range encodedBlob2 {
assert.NotNil(t, blobMessage)
assert.NotNil(t, blobMessage.BlobHeader)
assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments)
assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments.Commitment)
assert.NotNil(t, blobMessage.BlobHeader.BlobCommitments.LengthProof)
assert.Equal(t, blobMessage.BlobHeader.BlobCommitments.Length, uint(48))
assert.Len(t, blobMessage.BlobHeader.QuorumInfos, 1)
assert.ElementsMatch(t, blobMessage.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{{
assert.NotNil(t, encodedBlob1.BlobHeader)
assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments)
assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments.Commitment)
assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments.LengthProof)
assert.Equal(t, encodedBlob1.BlobHeader.BlobCommitments.Length, uint(48))
assert.Len(t, encodedBlob1.BlobHeader.QuorumInfos, 2)
assert.ElementsMatch(t, encodedBlob1.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{
{
SecurityParam: core.SecurityParam{
QuorumID: 2,
AdversaryThreshold: 75,
QuorumID: 0,
AdversaryThreshold: 80,
QuorumThreshold: 100,
},
ChunkLength: 16,
},
{
SecurityParam: core.SecurityParam{
QuorumID: 1,
AdversaryThreshold: 70,
QuorumThreshold: 95,
},
ChunkLength: 8,
}})
},
})

assert.Len(t, blobMessage.Bundles, 1)
assert.Greater(t, len(blobMessage.Bundles[core.QuorumID(2)]), 0)
assert.Contains(t, batch.BlobHeaders, encodedBlob1.BlobHeader)
for _, bundles := range encodedBlob1.BundlesByOperator {
assert.Len(t, bundles, 2)
assert.Greater(t, len(bundles[0]), 0)
assert.Greater(t, len(bundles[1]), 0)
break
}

assert.NotNil(t, encodedBlob2.BlobHeader)
assert.NotNil(t, encodedBlob2.BlobHeader.BlobCommitments)
assert.NotNil(t, encodedBlob2.BlobHeader.BlobCommitments.Commitment)
assert.NotNil(t, encodedBlob2.BlobHeader.BlobCommitments.LengthProof)
assert.Equal(t, encodedBlob2.BlobHeader.BlobCommitments.Length, uint(48))
assert.Len(t, encodedBlob2.BlobHeader.QuorumInfos, 1)
assert.ElementsMatch(t, encodedBlob2.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{{
SecurityParam: core.SecurityParam{
QuorumID: 2,
AdversaryThreshold: 75,
QuorumThreshold: 100,
},
ChunkLength: 8,
}})
for _, bundles := range encodedBlob2.BundlesByOperator {
assert.Len(t, bundles, 1)
assert.Greater(t, len(bundles[core.QuorumID(2)]), 0)
break
}
assert.Len(t, batch.BlobHeaders, 2)
Expand Down
Loading

0 comments on commit 780e1be

Please sign in to comment.