Skip to content

Commit

Permalink
node client v2
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Nov 5, 2024
1 parent 2fa16c8 commit 03807aa
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 5 deletions.
33 changes: 33 additions & 0 deletions api/clients/mock/node_client_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package mock

import (
"context"

"github.com/Layr-Labs/eigenda/api/clients"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/stretchr/testify/mock"
)

type MockNodeClientV2 struct {
mock.Mock
}

var _ clients.NodeClientV2 = (*MockNodeClientV2)(nil)

func NewNodeClientV2() *MockNodeClientV2 {
return &MockNodeClientV2{}
}

func (c *MockNodeClientV2) StoreChunks(ctx context.Context, batch *corev2.Batch) ([]byte, error) {
args := c.Called()
var signature []byte
if args.Get(0) != nil {
signature = (args.Get(0)).([]byte)
}
return signature, args.Error(1)
}

func (c *MockNodeClientV2) Close() error {
args := c.Called()
return args.Error(0)
}
109 changes: 109 additions & 0 deletions api/clients/node_client_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package clients

import (
"context"
"fmt"
"sync"

commonpb "github.com/Layr-Labs/eigenda/api/grpc/common/v2"
nodegrpc "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"google.golang.org/grpc"
)

type NodeClientV2Config struct {
Hostname string
Port string
UseSecureGrpcFlag bool
}

type NodeClientV2 interface {
StoreChunks(ctx context.Context, certs *corev2.Batch) (signature []byte, err error)
Close() error
}

type nodeClientV2 struct {
config *NodeClientV2Config
initOnce sync.Once
conn *grpc.ClientConn

dispersalClient nodegrpc.DispersalClient
}

var _ NodeClientV2 = (*nodeClientV2)(nil)

func NewNodeClientV2(config *NodeClientV2Config) (*nodeClientV2, error) {
if config == nil || config.Hostname == "" || config.Port == "" {
return nil, fmt.Errorf("invalid config: %v", config)
}
return &nodeClientV2{
config: config,
}, nil
}

func (c *nodeClientV2) StoreChunks(ctx context.Context, batch *corev2.Batch) (signature []byte, err error) {
if len(batch.BlobCertificates) == 0 {
return nil, fmt.Errorf("no blob certificates in the batch")
}

if err = c.initOnceGrpcConnection(); err != nil {
return nil, err
}

blobCerts := make([]*commonpb.BlobCertificate, len(batch.BlobCertificates))
for i, cert := range batch.BlobCertificates {
blobCerts[i], err = cert.ToProtobuf()
if err != nil {
return nil, fmt.Errorf("failed to convert blob certificate to protobuf: %v", err)
}
}

// Call the gRPC method to store chunks
response, err := c.dispersalClient.StoreChunks(ctx, &nodegrpc.StoreChunksRequest{
Batch: &commonpb.Batch{
Header: &commonpb.BatchHeader{
BatchRoot: batch.BatchHeader.BatchRoot[:],
ReferenceBlockNumber: batch.BatchHeader.ReferenceBlockNumber,
},
BlobCertificates: blobCerts,
},
})
if err != nil {
return nil, err
}

// Extract signatures from the response
if response == nil {
return nil, fmt.Errorf("received nil response from StoreChunks")
}

return response.GetSignature(), nil
}

// Close closes the grpc connection to the disperser server.
// It is thread safe and can be called multiple times.
func (c *nodeClientV2) Close() error {
if c.conn != nil {
err := c.conn.Close()
c.conn = nil
c.dispersalClient = nil
return err
}
return nil
}

func (c *nodeClientV2) initOnceGrpcConnection() error {
var initErr error
c.initOnce.Do(func() {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)
dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag)
conn, err := grpc.Dial(addr, dialOptions...)
if err != nil {
initErr = err
return
}
c.conn = conn
c.dispersalClient = nodegrpc.NewDispersalClient(conn)
})
return initErr
}
8 changes: 8 additions & 0 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,14 @@ func (pm *PaymentMetadata) UnmarshalDynamoDBAttributeValue(av types.AttributeVal
return nil
}

func (pm *PaymentMetadata) ToProtobuf() *commonpb.PaymentHeader {
return &commonpb.PaymentHeader{
AccountId: pm.AccountID,
BinIndex: pm.BinIndex,
CumulativePayment: pm.CumulativePayment.Bytes(),
}
}

// ConvertPaymentHeader converts a protobuf payment header to a PaymentMetadata
func ConvertPaymentHeader(header *commonpb.PaymentHeader) *PaymentMetadata {
return &PaymentMetadata{
Expand Down
46 changes: 44 additions & 2 deletions core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package v2
import (
"encoding/hex"
"errors"
"fmt"
"math"
"math/big"
"strings"

pb "github.com/Layr-Labs/eigenda/api/grpc/common/v2"
commonpb "github.com/Layr-Labs/eigenda/api/grpc/common/v2"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/consensys/gnark-crypto/ecc/bn254"
Expand Down Expand Up @@ -72,7 +73,7 @@ type BlobHeader struct {
Signature []byte
}

func NewBlobHeader(proto *pb.BlobHeader) (*BlobHeader, error) {
func NewBlobHeader(proto *commonpb.BlobHeader) (*BlobHeader, error) {
commitment, err := new(encoding.G1Commitment).Deserialize(proto.GetCommitment().GetCommitment())
if err != nil {
return nil, err
Expand Down Expand Up @@ -126,6 +127,26 @@ func NewBlobHeader(proto *pb.BlobHeader) (*BlobHeader, error) {
}, nil
}

func (b *BlobHeader) ToProtobuf() (*commonpb.BlobHeader, error) {
quorums := make([]uint32, len(b.QuorumNumbers))
for i, q := range b.QuorumNumbers {
quorums[i] = uint32(q)
}

commitments, err := b.BlobCommitments.ToProtobuf()
if err != nil {
return nil, fmt.Errorf("failed to convert blob commitments to protobuf: %v", err)
}

return &commonpb.BlobHeader{
Version: uint32(b.BlobVersion),
QuorumNumbers: quorums,
Commitment: commitments,
PaymentHeader: b.PaymentMetadata.ToProtobuf(),
Signature: b.Signature,
}, nil
}

func (b *BlobHeader) GetEncodingParams() (encoding.EncodingParams, error) {
params := ParametersMap[b.BlobVersion]

Expand Down Expand Up @@ -296,6 +317,27 @@ type BlobCertificate struct {
RelayKeys []RelayKey
}

func (c *BlobCertificate) ToProtobuf() (*commonpb.BlobCertificate, error) {
if c.BlobHeader == nil {
return nil, fmt.Errorf("blob header is nil")
}

blobHeader, err := c.BlobHeader.ToProtobuf()
if err != nil {
return nil, fmt.Errorf("failed to convert blob header to protobuf: %v", err)
}

relays := make([]uint32, len(c.RelayKeys))
for i, r := range c.RelayKeys {
relays[i] = uint32(r)
}

return &commonpb.BlobCertificate{
BlobHeader: blobHeader,
Relays: relays,
}, nil
}

type BatchHeader struct {
BatchRoot [32]byte
ReferenceBlockNumber uint64
Expand Down
4 changes: 2 additions & 2 deletions disperser/apiserver/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestV2DisperseBlob(t *testing.T) {
assert.NoError(t, err)
accountID, err := c.Signer.GetAccountID()
assert.NoError(t, err)
commitmentProto, err := commitments.ToProfobuf()
commitmentProto, err := commitments.ToProtobuf()
assert.NoError(t, err)
blobHeaderProto := &pbcommonv2.BlobHeader{
Version: 0,
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestV2DisperseBlobRequestValidation(t *testing.T) {
BlobHeader: invalidReqProto,
})
assert.ErrorContains(t, err, "blob header must contain commitments")
commitmentProto, err := commitments.ToProfobuf()
commitmentProto, err := commitments.ToProtobuf()
assert.NoError(t, err)

// request with too many quorums
Expand Down
2 changes: 1 addition & 1 deletion encoding/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type BlobCommitments struct {
}

// ToProfobuf converts the BlobCommitments to protobuf format
func (c *BlobCommitments) ToProfobuf() (*pbcommon.BlobCommitment, error) {
func (c *BlobCommitments) ToProtobuf() (*pbcommon.BlobCommitment, error) {
commitData, err := c.Commitment.Serialize()
if err != nil {
return nil, err
Expand Down

0 comments on commit 03807aa

Please sign in to comment.