Skip to content

Commit

Permalink
[v2] Variety of fixes in disperser (Layr-Labs#907)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Nov 18, 2024
1 parent 71c6ea6 commit ab13bd7
Show file tree
Hide file tree
Showing 14 changed files with 144 additions and 20 deletions.
6 changes: 4 additions & 2 deletions core/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,10 @@ func SerializeMerkleProof(proof *merkletree.Proof) []byte {
return proofBytes
}

func DeserializeMerkleProof(data []byte) (*merkletree.Proof, error) {
proof := &merkletree.Proof{}
func DeserializeMerkleProof(data []byte, index uint64) (*merkletree.Proof, error) {
proof := &merkletree.Proof{
Index: index,
}
if len(data)%32 != 0 {
return nil, fmt.Errorf("invalid proof length")
}
Expand Down
25 changes: 23 additions & 2 deletions core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type BlobHeader struct {
Signature []byte
}

func NewBlobHeader(proto *commonpb.BlobHeader) (*BlobHeader, error) {
func BlobHeaderFromProtobuf(proto *commonpb.BlobHeader) (*BlobHeader, error) {
commitment, err := new(encoding.G1Commitment).Deserialize(proto.GetCommitment().GetCommitment())
if err != nil {
return nil, err
Expand Down Expand Up @@ -202,6 +202,27 @@ func (c *BlobCertificate) ToProtobuf() (*commonpb.BlobCertificate, error) {
}, nil
}

func BlobCertificateFromProtobuf(proto *commonpb.BlobCertificate) (*BlobCertificate, error) {
if proto.GetBlobHeader() == nil {
return nil, errors.New("missing blob header in blob certificate")
}

blobHeader, err := BlobHeaderFromProtobuf(proto.GetBlobHeader())
if err != nil {
return nil, fmt.Errorf("failed to create blob header: %v", err)
}

relayKeys := make([]RelayKey, len(proto.GetRelays()))
for i, r := range proto.GetRelays() {
relayKeys[i] = RelayKey(r)
}

return &BlobCertificate{
BlobHeader: blobHeader,
RelayKeys: relayKeys,
}, nil
}

type BatchHeader struct {
// BatchRoot is the root of a Merkle tree whose leaves are the keys of the blobs in the batch
BatchRoot [32]byte
Expand Down Expand Up @@ -272,7 +293,7 @@ func BatchFromProtobuf(proto *commonpb.Batch) (*Batch, error) {

blobCerts := make([]*BlobCertificate, len(proto.GetBlobCertificates()))
for i, cert := range proto.GetBlobCertificates() {
blobHeader, err := NewBlobHeader(cert.GetBlobHeader())
blobHeader, err := BlobHeaderFromProtobuf(cert.GetBlobHeader())
if err != nil {
return nil, fmt.Errorf("failed to create blob header: %v", err)
}
Expand Down
61 changes: 61 additions & 0 deletions core/v2/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,64 @@ func TestConvertBatchToFromProtobuf(t *testing.T) {

assert.Equal(t, batch, newBatch)
}

func TestConvertBlobHeaderToFromProtobuf(t *testing.T) {
data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES)
commitments, err := p.GetCommitments(data)
if err != nil {
t.Fatal(err)
}

bh := &v2.BlobHeader{
BlobVersion: 0,
BlobCommitments: commitments,
QuorumNumbers: []core.QuorumID{0, 1},
PaymentMetadata: core.PaymentMetadata{
AccountID: "0x123",
BinIndex: 5,
CumulativePayment: big.NewInt(100),
},
Signature: []byte{1, 2, 3},
}

pb, err := bh.ToProtobuf()
assert.NoError(t, err)

newBH, err := v2.BlobHeaderFromProtobuf(pb)
assert.NoError(t, err)

assert.Equal(t, bh, newBH)
}

func TestConvertBlobCertToFromProtobuf(t *testing.T) {
data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES)
commitments, err := p.GetCommitments(data)
if err != nil {
t.Fatal(err)
}

bh := &v2.BlobHeader{
BlobVersion: 0,
BlobCommitments: commitments,
QuorumNumbers: []core.QuorumID{0, 1},
PaymentMetadata: core.PaymentMetadata{
AccountID: "0x123",
BinIndex: 5,
CumulativePayment: big.NewInt(100),
},
Signature: []byte{1, 2, 3},
}

blobCert := &v2.BlobCertificate{
BlobHeader: bh,
RelayKeys: []v2.RelayKey{0, 1},
}

pb, err := blobCert.ToProtobuf()
assert.NoError(t, err)

newBlobCert, err := v2.BlobCertificateFromProtobuf(pb)
assert.NoError(t, err)

assert.Equal(t, blobCert, newBlobCert)
}
4 changes: 2 additions & 2 deletions disperser/apiserver/disperse_blob_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl
}

data := req.GetData()
blobHeader, err := corev2.NewBlobHeader(req.GetBlobHeader())
blobHeader, err := corev2.BlobHeaderFromProtobuf(req.GetBlobHeader())
if err != nil {
return nil, api.NewErrorInternal(err.Error())
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func (s *DispersalServerV2) validateDispersalRequest(req *pb.DisperseBlobRequest
return api.NewErrorInvalidArg(fmt.Sprintf("invalid blob version %d; valid blob versions are: %v", blobHeaderProto.GetVersion(), validVersions))
}

blobHeader, err := corev2.NewBlobHeader(blobHeaderProto)
blobHeader, err := corev2.BlobHeaderFromProtobuf(blobHeaderProto)
if err != nil {
return api.NewErrorInvalidArg(fmt.Sprintf("invalid blob header: %s", err.Error()))
}
Expand Down
2 changes: 1 addition & 1 deletion disperser/apiserver/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestV2DisperseBlob(t *testing.T) {
CumulativePayment: big.NewInt(100).Bytes(),
},
}
blobHeader, err := corev2.NewBlobHeader(blobHeaderProto)
blobHeader, err := corev2.BlobHeaderFromProtobuf(blobHeaderProto)
assert.NoError(t, err)
signer := auth.NewLocalBlobRequestSigner(privateKeyHex)
sig, err := signer.SignBlobRequest(blobHeader)
Expand Down
21 changes: 20 additions & 1 deletion disperser/cmd/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,25 @@ func NewConfig(ctx *cli.Context) (Config, error) {
return Config{}, err
}

encodingConfig := kzg.ReadCLIConfig(ctx)
if version == uint(V2) {
if encodingConfig.G1Path == "" {
return Config{}, fmt.Errorf("G1Path must be specified for disperser version 2")
}
if encodingConfig.G2Path == "" {
return Config{}, fmt.Errorf("G2Path must be specified for disperser version 2")
}
if encodingConfig.CacheDir == "" {
return Config{}, fmt.Errorf("CacheDir must be specified for disperser version 2")
}
if encodingConfig.SRSOrder <= 0 {
return Config{}, fmt.Errorf("SRSOrder must be specified for disperser version 2")
}
if encodingConfig.SRSNumberToLoad <= 0 {
return Config{}, fmt.Errorf("SRSNumberToLoad must be specified for disperser version 2")
}
}

config := Config{
DisperserVersion: DisperserVersion(version),
AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix),
Expand All @@ -90,7 +109,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
},
RatelimiterConfig: ratelimiterConfig,
RateConfig: rateConfig,
EncodingConfig: kzg.ReadCLIConfig(ctx),
EncodingConfig: encodingConfig,
EnableRatelimiter: ctx.GlobalBool(flags.EnableRatelimiter.Name),
EnablePaymentMeterer: ctx.GlobalBool(flags.EnablePaymentMeterer.Name),
ReservationsTableName: ctx.GlobalString(flags.ReservationsTableName.Name),
Expand Down
6 changes: 4 additions & 2 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
}

keys := make([]corev2.BlobKey, len(blobMetadatas))
newLastUpdatedAt := d.lastUpdatedAt
for i, metadata := range blobMetadatas {
if metadata == nil || metadata.BlobHeader == nil {
return nil, fmt.Errorf("invalid blob metadata")
Expand All @@ -252,8 +253,8 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
return nil, fmt.Errorf("failed to get blob key: %w", err)
}
keys[i] = blobKey
if metadata.UpdatedAt > d.lastUpdatedAt {
d.lastUpdatedAt = metadata.UpdatedAt
if metadata.UpdatedAt > newLastUpdatedAt {
newLastUpdatedAt = metadata.UpdatedAt
}
}

Expand Down Expand Up @@ -343,6 +344,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
return nil, fmt.Errorf("failed to put blob verification infos: %w", err)
}

d.lastUpdatedAt = newLastUpdatedAt
return &batchData{
Batch: &corev2.Batch{
BatchHeader: batchHeader,
Expand Down
2 changes: 1 addition & 1 deletion disperser/controller/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestDispatcherNewBatch(t *testing.T) {
require.Equal(t, bh, vi0.BatchHeader)
certHash, err := cert.Hash()
require.NoError(t, err)
proof, err := core.DeserializeMerkleProof(vi0.InclusionProof)
proof, err := core.DeserializeMerkleProof(vi0.InclusionProof, uint64(vi0.BlobIndex))
require.NoError(t, err)
verified, err := merkletree.VerifyProofUsing(certHash[:], false, proof, [][]byte{vi0.BatchRoot[:]}, keccak256.New())
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {
}

for _, blob := range blobMetadatas {
blob := blob
blobKey, err := blob.BlobHeader.BlobKey()
if err != nil {
e.logger.Error("failed to get blob key", "err", err, "requestedAt", blob.RequestedAt, "paymentMetadata", blob.BlobHeader.PaymentMetadata)
Expand Down
6 changes: 6 additions & 0 deletions disperser/encoder/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (s *EncoderServerV2) handleEncodingToChunkStore(ctx context.Context, req *p
encodingStart := time.Now()
frames, err := s.prover.GetFrames(data, encodingParams)
if err != nil {
s.logger.Error("failed to encode frames", "error", err)
return nil, status.Errorf(codes.Internal, "encoding failed: %v", err)
}
s.logger.Info("encoding frames", "duration", time.Since(encodingStart))
Expand Down Expand Up @@ -204,6 +205,11 @@ func (s *EncoderServerV2) validateAndParseRequest(req *pb.EncodeBlobRequest) (co
NumChunks: req.EncodingParams.NumChunks,
}

err = encoding.ValidateEncodingParams(params, s.prover.GetSRSOrder())
if err != nil {
return blobKey, params, status.Errorf(codes.InvalidArgument, "invalid encoding parameters: %v", err)
}

return blobKey, params, nil
}

Expand Down
2 changes: 2 additions & 0 deletions encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type Prover interface {
GetFrames(data []byte, params EncodingParams) ([]*Frame, error)

GetMultiFrameProofs(data []byte, params EncodingParams) ([]Proof, error)

GetSRSOrder() uint64
}

type Verifier interface {
Expand Down
4 changes: 4 additions & 0 deletions encoding/kzg/prover/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ func (g *Prover) GetKzgEncoder(params encoding.EncodingParams) (*ParametrizedPro
return enc, err
}

func (g *Prover) GetSRSOrder() uint64 {
return g.SRSOrder
}

// Detect the precomputed table from the specified directory
// the file name follow the name convention of
//
Expand Down
5 changes: 5 additions & 0 deletions encoding/mock/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func (e *MockEncoder) GetMultiFrameProofs(data []byte, params encoding.EncodingP
return args.Get(0).([]encoding.Proof), args.Error(1)
}

func (e *MockEncoder) GetSRSOrder() uint64 {
args := e.Called()
return args.Get(0).(uint64)
}

func (e *MockEncoder) VerifyFrames(chunks []*encoding.Frame, indices []encoding.ChunkNumber, commitments encoding.BlobCommitments, params encoding.EncodingParams) error {
args := e.Called(chunks, indices, commitments, params)
time.Sleep(e.Delay)
Expand Down
19 changes: 10 additions & 9 deletions relay/relay_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ package relay
import (
"context"
"fmt"
"log"
"math/big"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"time"

pbcommon "github.com/Layr-Labs/eigenda/api/grpc/common"
pbcommonv2 "github.com/Layr-Labs/eigenda/api/grpc/common/v2"
"github.com/Layr-Labs/eigenda/common"
Expand All @@ -24,14 +33,6 @@ import (
"github.com/google/uuid"
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"
"log"
"math/big"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"time"
)

var (
Expand Down Expand Up @@ -196,7 +197,7 @@ func randomBlob(t *testing.T) (*v2.BlobHeader, []byte) {
CumulativePayment: big.NewInt(100).Bytes(),
},
}
blobHeader, err := v2.NewBlobHeader(blobHeaderProto)
blobHeader, err := v2.BlobHeaderFromProtobuf(blobHeaderProto)
require.NoError(t, err)

return blobHeader, data
Expand Down

0 comments on commit ab13bd7

Please sign in to comment.