Skip to content

Commit

Permalink
v2 disperser GetBlobStatus endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Nov 13, 2024
1 parent 94ab42f commit e09336f
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 7 deletions.
50 changes: 50 additions & 0 deletions core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

commonpb "github.com/Layr-Labs/eigenda/api/grpc/common/v2"
disperserpb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/consensys/gnark-crypto/ecc/bn254"
Expand Down Expand Up @@ -206,6 +207,13 @@ type BatchHeader struct {
ReferenceBlockNumber uint64
}

func (h *BatchHeader) ToProtobuf() *commonpb.BatchHeader {
return &commonpb.BatchHeader{
BatchRoot: h.BatchRoot[:],
ReferenceBlockNumber: h.ReferenceBlockNumber,
}
}

type Batch struct {
BatchHeader *BatchHeader
BlobCertificates []*BlobCertificate
Expand All @@ -228,6 +236,36 @@ type Attestation struct {
QuorumNumbers []core.QuorumID
}

func (a *Attestation) ToProtobuf() *disperserpb.Attestation {
nonSignerPubKeys := make([][]byte, len(a.NonSignerPubKeys))
for i, p := range a.NonSignerPubKeys {
pubkeyBytes := p.Bytes()
nonSignerPubKeys[i] = pubkeyBytes[:]
}

quorumAPKs := make([][]byte, len(a.QuorumAPKs))
for i, p := range a.QuorumAPKs {
apkBytes := p.Bytes()
quorumAPKs[i] = apkBytes[:]
}

quorumNumbers := make([]uint32, len(a.QuorumNumbers))
for i, q := range a.QuorumNumbers {
quorumNumbers[i] = uint32(q)
}

apkG2Bytes := a.APKG2.Bytes()
sigmaBytes := a.Sigma.Bytes()

return &disperserpb.Attestation{
NonSignerPubkeys: nonSignerPubKeys,
ApkG2: apkG2Bytes[:],
QuorumApks: quorumAPKs,
Sigma: sigmaBytes[:],
QuorumNumbers: quorumNumbers,
}
}

type BlobVerificationInfo struct {
*BatchHeader

Expand All @@ -236,6 +274,18 @@ type BlobVerificationInfo struct {
InclusionProof []byte
}

func (v *BlobVerificationInfo) ToProtobuf(blobCert *BlobCertificate) (*disperserpb.BlobVerificationInfo, error) {
blobCertProto, err := blobCert.ToProtobuf()
if err != nil {
return nil, err
}
return &disperserpb.BlobVerificationInfo{
BlobCertificate: blobCertProto,
BlobIndex: v.BlobIndex,
InclusionProof: v.InclusionProof,
}, nil
}

type BlobVersionParameters struct {
CodingRate uint32
ReconstructionThreshold float64
Expand Down
89 changes: 89 additions & 0 deletions disperser/apiserver/get_blob_status_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package apiserver

import (
"context"
"fmt"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
)

func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatusRequest) (*pb.BlobStatusReply, error) {
if req.GetBlobKey() == nil || len(req.GetBlobKey()) != 32 {
return nil, api.NewErrorInvalidArg("invalid blob key")
}

blobKey, err := corev2.BytesToBlobKey(req.GetBlobKey())
if err != nil {
return nil, api.NewErrorInvalidArg("invalid blob key")
}

metadata, err := s.blobMetadataStore.GetBlobMetadata(ctx, blobKey)
if err != nil {
s.logger.Error("failed to get blob metadata", "err", err, "blobKey", blobKey.Hex())
return nil, api.NewErrorInternal(fmt.Sprintf("failed to get blob metadata: %s", err.Error()))
}

if metadata.BlobStatus != dispv2.Certified {
return &pb.BlobStatusReply{
Status: metadata.BlobStatus.ToProfobuf(),
}, nil
}

cert, _, err := s.blobMetadataStore.GetBlobCertificate(ctx, blobKey)
if err != nil {
s.logger.Error("failed to get blob certificate", "err", err, "blobKey", blobKey.Hex())
return nil, api.NewErrorInternal(fmt.Sprintf("failed to get blob certificate: %s", err.Error()))
}

// For certified blobs, include signed batch and blob verification info
blobVerificationInfos, err := s.blobMetadataStore.GetBlobVerificationInfos(ctx, blobKey)
if err != nil {
s.logger.Error("failed to get blob verification info", "err", err, "blobKey", blobKey.Hex())
return nil, api.NewErrorInternal(fmt.Sprintf("failed to get blob verification info: %s", err.Error()))
}

if len(blobVerificationInfos) == 0 {
s.logger.Error("no verification info found for certified blob", "blobKey", blobKey.Hex())
return nil, api.NewErrorInternal("no verification info found")
}

if len(blobVerificationInfos) > 1 {
s.logger.Warn("multiple verification info found for certified blob", "blobKey", blobKey.Hex())
}

for _, verificationInfo := range blobVerificationInfos {
// get the signed batch from this verification info
batchHeaderHash, err := verificationInfo.BatchHeader.Hash()
if err != nil {
s.logger.Error("failed to get batch header hash", "err", err, "blobKey", blobKey.Hex())
continue
}
batchHeader, attestation, err := s.blobMetadataStore.GetSignedBatch(ctx, batchHeaderHash)
if err != nil {
s.logger.Error("failed to get signed batch", "err", err, "blobKey", blobKey.Hex())
continue
}

blobVerificationInfoProto, err := verificationInfo.ToProtobuf(cert)
if err != nil {
s.logger.Error("failed to convert blob verification info to protobuf", "err", err, "blobKey", blobKey.Hex())
continue
}

// return the first signed batch found
return &pb.BlobStatusReply{
Status: metadata.BlobStatus.ToProfobuf(),
SignedBatch: &pb.SignedBatch{
Header: batchHeader.ToProtobuf(),
NonSignerStakesAndSignature: attestation.ToProtobuf(),
},
BlobVerificationInfo: blobVerificationInfoProto,
}, nil
}

s.logger.Error("no signed batch found for certified blob", "blobKey", blobKey.Hex())
return nil, api.NewErrorInternal("no signed batch found")
}
46 changes: 46 additions & 0 deletions disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/inabox/deploy"
"github.com/consensys/gnark-crypto/ecc/bn254"
"github.com/consensys/gnark-crypto/ecc/bn254/fp"
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/assert"
Expand All @@ -64,6 +65,7 @@ var (
localStackPort = "4569"
allowlistFile *os.File
testMaxBlobSize = 2 * 1024 * 1024
mockCommitment = encoding.BlobCommitments{}
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -657,6 +659,50 @@ func setup() {
}

dispersalServer = newTestServer(transactor, "setup")

var X1, Y1 fp.Element
X1 = *X1.SetBigInt(big.NewInt(1))
Y1 = *Y1.SetBigInt(big.NewInt(2))

var lengthXA0, lengthXA1, lengthYA0, lengthYA1 fp.Element
_, err = lengthXA0.SetString("10857046999023057135944570762232829481370756359578518086990519993285655852781")
if err != nil {
teardown()
panic("failed to create mock commitment: " + err.Error())
}
_, err = lengthXA1.SetString("11559732032986387107991004021392285783925812861821192530917403151452391805634")
if err != nil {
teardown()
panic("failed to create mock commitment: " + err.Error())
}
_, err = lengthYA0.SetString("8495653923123431417604973247489272438418190587263600148770280649306958101930")
if err != nil {
teardown()
panic("failed to create mock commitment: " + err.Error())
}
_, err = lengthYA1.SetString("4082367875863433681332203403145435568316851327593401208105741076214120093531")
if err != nil {
teardown()
panic("failed to create mock commitment: " + err.Error())
}

var lengthProof, lengthCommitment bn254.G2Affine
lengthProof.X.A0 = lengthXA0
lengthProof.X.A1 = lengthXA1
lengthProof.Y.A0 = lengthYA0
lengthProof.Y.A1 = lengthYA1

lengthCommitment = lengthProof

mockCommitment = encoding.BlobCommitments{
Commitment: &encoding.G1Commitment{
X: X1,
Y: Y1,
},
LengthCommitment: (*encoding.G2Commitment)(&lengthCommitment),
LengthProof: (*encoding.G2Commitment)(&lengthProof),
Length: 16,
}
}

func teardown() {
Expand Down
4 changes: 0 additions & 4 deletions disperser/apiserver/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,6 @@ func (s *DispersalServerV2) Start(ctx context.Context) error {
return nil
}

func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatusRequest) (*pb.BlobStatusReply, error) {
return &pb.BlobStatusReply{}, api.NewErrorUnimplemented()
}

func (s *DispersalServerV2) GetBlobCommitment(ctx context.Context, req *pb.BlobCommitmentRequest) (*pb.BlobCommitmentReply, error) {
return &pb.BlobCommitmentReply{}, api.NewErrorUnimplemented()
}
Expand Down
104 changes: 101 additions & 3 deletions disperser/apiserver/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/Layr-Labs/eigenda/common/aws/s3"
"github.com/Layr-Labs/eigenda/core"
auth "github.com/Layr-Labs/eigenda/core/auth/v2"
"github.com/Layr-Labs/eigenda/core/mock"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/consensys/gnark-crypto/ecc/bn254"
"google.golang.org/grpc/peer"

pbcommon "github.com/Layr-Labs/eigenda/api/grpc/common"
Expand All @@ -28,6 +30,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser"
"github.com/stretchr/testify/assert"
tmock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

type testComponents struct {
Expand Down Expand Up @@ -201,10 +204,105 @@ func TestV2DisperseBlobRequestValidation(t *testing.T) {

func TestV2GetBlobStatus(t *testing.T) {
c := newTestServerV2(t)
_, err := c.DispersalServerV2.GetBlobStatus(context.Background(), &pbv2.BlobStatusRequest{
BlobKey: []byte{1},
ctx := peer.NewContext(context.Background(), c.Peer)

blobHeader := &corev2.BlobHeader{
BlobVersion: 0,
BlobCommitments: mockCommitment,
QuorumNumbers: []core.QuorumID{0},
PaymentMetadata: core.PaymentMetadata{
AccountID: "0x1234",
BinIndex: 0,
CumulativePayment: big.NewInt(532),
},
}
blobKey, err := blobHeader.BlobKey()
require.NoError(t, err)
now := time.Now()
metadata := &dispv2.BlobMetadata{
BlobHeader: blobHeader,
BlobStatus: dispv2.Queued,
Expiry: uint64(now.Add(time.Hour).Unix()),
NumRetries: 0,
UpdatedAt: uint64(now.UnixNano()),
}
err = c.BlobMetadataStore.PutBlobMetadata(ctx, metadata)
require.NoError(t, err)
blobCert := &corev2.BlobCertificate{
BlobHeader: blobHeader,
RelayKeys: []corev2.RelayKey{0, 1, 2},
}
err = c.BlobMetadataStore.PutBlobCertificate(ctx, blobCert, nil)
require.NoError(t, err)

// Non ceritified blob status
status, err := c.DispersalServerV2.GetBlobStatus(ctx, &pbv2.BlobStatusRequest{
BlobKey: blobKey[:],
})
assert.ErrorContains(t, err, "not implemented")
require.NoError(t, err)
require.Equal(t, pbv2.BlobStatus_QUEUED, status.Status)
err = c.BlobMetadataStore.UpdateBlobStatus(ctx, blobKey, dispv2.Encoded)
require.NoError(t, err)
status, err = c.DispersalServerV2.GetBlobStatus(ctx, &pbv2.BlobStatusRequest{
BlobKey: blobKey[:],
})
require.NoError(t, err)
require.Equal(t, pbv2.BlobStatus_ENCODED, status.Status)

// Certified blob status
err = c.BlobMetadataStore.UpdateBlobStatus(ctx, blobKey, dispv2.Certified)
require.NoError(t, err)
batchHeader := &corev2.BatchHeader{
BatchRoot: [32]byte{1, 2, 3},
ReferenceBlockNumber: 100,
}
err = c.BlobMetadataStore.PutBatchHeader(ctx, batchHeader)
require.NoError(t, err)
verificationInfo0 := &corev2.BlobVerificationInfo{
BatchHeader: batchHeader,
BlobKey: blobKey,
BlobIndex: 123,
InclusionProof: []byte("inclusion proof"),
}
err = c.BlobMetadataStore.PutBlobVerificationInfo(ctx, verificationInfo0)
require.NoError(t, err)

attestation := &corev2.Attestation{
BatchHeader: batchHeader,
NonSignerPubKeys: []*core.G1Point{
core.NewG1Point(big.NewInt(1), big.NewInt(2)),
core.NewG1Point(big.NewInt(3), big.NewInt(4)),
},
APKG2: &core.G2Point{
G2Affine: &bn254.G2Affine{
X: mockCommitment.LengthCommitment.X,
Y: mockCommitment.LengthCommitment.Y,
},
},
Sigma: &core.Signature{
G1Point: core.NewG1Point(big.NewInt(5), big.NewInt(6)),
},
}
err = c.BlobMetadataStore.PutAttestation(ctx, attestation)
require.NoError(t, err)

reply, err := c.DispersalServerV2.GetBlobStatus(context.Background(), &pbv2.BlobStatusRequest{
BlobKey: blobKey[:],
})
require.NoError(t, err)
require.Equal(t, pbv2.BlobStatus_CERTIFIED, reply.GetStatus())
blobHeaderProto, err := blobHeader.ToProtobuf()
require.NoError(t, err)
blobCertProto, err := blobCert.ToProtobuf()
require.NoError(t, err)
require.Equal(t, blobHeaderProto, reply.GetBlobVerificationInfo().GetBlobCertificate().GetBlobHeader())
require.Equal(t, blobCertProto.Relays, reply.GetBlobVerificationInfo().GetBlobCertificate().GetRelays())
require.Equal(t, verificationInfo0.BlobIndex, reply.GetBlobVerificationInfo().GetBlobIndex())
require.Equal(t, verificationInfo0.InclusionProof, reply.GetBlobVerificationInfo().GetInclusionProof())
require.Equal(t, batchHeader.BatchRoot[:], reply.GetSignedBatch().GetHeader().BatchRoot)
require.Equal(t, batchHeader.ReferenceBlockNumber, reply.GetSignedBatch().GetHeader().ReferenceBlockNumber)
attestationProto := attestation.ToProtobuf()
require.Equal(t, attestationProto, reply.GetSignedBatch().GetNonSignerStakesAndSignature())
}

func TestV2GetBlobCommitment(t *testing.T) {
Expand Down
Loading

0 comments on commit e09336f

Please sign in to comment.