Skip to content

Commit

Permalink
Merge master into m2 mainnet contracts (#220)
Browse files Browse the repository at this point in the history
Co-authored-by: siddimore <[email protected]>
Co-authored-by: Ian Shim <[email protected]>
Co-authored-by: bolatfurkan <[email protected]>
Co-authored-by: Peter Straus <[email protected]>
Co-authored-by: WELLINGTON MIRANDA BARBOSA <[email protected]>
Co-authored-by: Wellington Barbosa <[email protected]>
Co-authored-by: Jian Xiao <[email protected]>
Co-authored-by: buldazer <[email protected]>
Co-authored-by: Madhur Shrimal <[email protected]>
Co-authored-by: Daniel Mancia <[email protected]>
Co-authored-by: Jian Xiao <[email protected]>
  • Loading branch information
12 people authored Jan 31, 2024
1 parent d104c9d commit 0efdbf0
Show file tree
Hide file tree
Showing 82 changed files with 1,683 additions and 670 deletions.
60 changes: 60 additions & 0 deletions api/grpc/mock/disperser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package mock

import (
"context"
"errors"

"github.com/Layr-Labs/eigenda/api/grpc/disperser"

"google.golang.org/grpc"
)

func MakeStreamMock(ctx context.Context) *StreamMock {
return &StreamMock{
ctx: ctx,
recvToServer: make(chan *disperser.AuthenticatedRequest, 10),
sentFromServer: make(chan *disperser.AuthenticatedReply, 10),
}
}

type StreamMock struct {
grpc.ServerStream
ctx context.Context
recvToServer chan *disperser.AuthenticatedRequest
sentFromServer chan *disperser.AuthenticatedReply
}

func (m *StreamMock) Context() context.Context {
return m.ctx
}

func (m *StreamMock) Send(resp *disperser.AuthenticatedReply) error {
m.sentFromServer <- resp
return nil
}

func (m *StreamMock) Recv() (*disperser.AuthenticatedRequest, error) {
req, more := <-m.recvToServer
if !more {
return nil, errors.New("empty")
}
return req, nil
}

func (m *StreamMock) SendFromClient(req *disperser.AuthenticatedRequest) error {
m.recvToServer <- req
return nil
}

func (m *StreamMock) RecvToClient() (*disperser.AuthenticatedReply, error) {
response, more := <-m.sentFromServer
if !more {
return nil, errors.New("empty")
}
return response, nil
}

func (m *StreamMock) Close() {
close(m.recvToServer)
close(m.sentFromServer)
}
5 changes: 4 additions & 1 deletion api/proto/disperser/disperser.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ message DisperseBlobRequest {
// within the same batch.
repeated SecurityParams security_params = 2;

// The account ID of the client. This should be a hex-encoded string of the ECSDA public key
// corresponding to the key used by the client to sign the BlobAuthHeader.
string account_id = 3;
}

Expand Down Expand Up @@ -252,7 +254,8 @@ message BatchMetadata {
message BatchHeader {
// The root of the merkle tree with the hashes of blob headers as leaves.
bytes batch_root = 1;
// All quorums associated with blobs in this batch.
// All quorums associated with blobs in this batch. Sorted in ascending order.
// Ex. [0, 2, 1] => 0x000102
bytes quorum_numbers = 2;
// The percentage of stake that has signed for this batch.
// The quorum_signed_percentages[i] is percentage for the quorum_numbers[i].
Expand Down
2 changes: 1 addition & 1 deletion churner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
FailReasonInsufficientStakeToChurn FailReason = "insufficient_stake_to_churn" // Operator doesn't have enough stake to be churned
FailReasonQuorumIdOutOfRange FailReason = "quorum_id_out_of_range" // Quorum ID out of range: quorum is not in the range of [0, QuorumCount]
FailReasonPrevApprovalNotExpired FailReason = "prev_approval_not_expired" // Expiry: previous approval hasn't expired
FailReasonInvalidSignature FailReason = "invalid_signature" // Invalid signature: operator's signature is wong
FailReasonInvalidSignature FailReason = "invalid_signature" // Invalid signature: operator's signature is wrong
FailReasonProcessChurnRequestFailed FailReason = "failed_process_churn_request" // Failed to process churn request
FailReasonInvalidRequest FailReason = "invalid_request" // Invalid request: request is malformed
)
Expand Down
2 changes: 1 addition & 1 deletion churner/tests/churner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func setup(m *testing.M) {

func teardown() {
if testConfig != nil {
fmt.Println("Stoping anvil")
fmt.Println("Stopping anvil")
testConfig.StopAnvil()
testConfig.StopGraphNode()
}
Expand Down
59 changes: 32 additions & 27 deletions clients/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/retriever/flags"
"github.com/urfave/cli"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -23,17 +21,18 @@ type Config struct {
UseSecureGrpcFlag bool
}

func NewConfig(ctx *cli.Context) *Config {
func NewConfig(hostname, port string, timeout time.Duration, useSecureGrpcFlag bool) *Config {
return &Config{
Hostname: ctx.GlobalString(flags.HostnameFlag.Name),
Port: ctx.GlobalString(flags.GrpcPortFlag.Name),
Timeout: ctx.Duration(flags.TimeoutFlag.Name),
Hostname: hostname,
Port: port,
Timeout: timeout,
UseSecureGrpcFlag: useSecureGrpcFlag,
}
}

type DisperserClient interface {
DisperseBlob(ctx context.Context, data []byte, quorumID, quorumThreshold, adversityThreshold uint32) (*disperser.BlobStatus, []byte, error)
DisperseBlobAuthenticated(ctx context.Context, data []byte, quorumID, quorumThreshold, adversityThreshold uint32) (*disperser.BlobStatus, []byte, error)
DisperseBlob(ctx context.Context, data []byte, securityParams []*core.SecurityParam) (*disperser.BlobStatus, []byte, error)
DisperseBlobAuthenticated(ctx context.Context, data []byte, securityParams []*core.SecurityParam) (*disperser.BlobStatus, []byte, error)
GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error)
}

Expand All @@ -59,7 +58,7 @@ func (c *disperserClient) getDialOptions() []grpc.DialOption {
}
}

func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorumID, quorumThreshold, adversityThreshold uint32) (*disperser.BlobStatus, []byte, error) {
func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, securityParams []*core.SecurityParam) (*disperser.BlobStatus, []byte, error) {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)

dialOptions := c.getDialOptions()
Expand All @@ -73,15 +72,18 @@ func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorumI
ctxTimeout, cancel := context.WithTimeout(ctx, c.config.Timeout)
defer cancel()

sp := make([]*disperser_rpc.SecurityParams, len(securityParams))
for i, s := range securityParams {
sp[i] = &disperser_rpc.SecurityParams{
QuorumId: uint32(s.QuorumID),
QuorumThreshold: uint32(s.QuorumThreshold),
AdversaryThreshold: uint32(s.AdversaryThreshold),
}
}

request := &disperser_rpc.DisperseBlobRequest{
Data: data,
SecurityParams: []*disperser_rpc.SecurityParams{
{
QuorumId: quorumID,
QuorumThreshold: quorumThreshold,
AdversaryThreshold: adversityThreshold,
},
},
Data: data,
SecurityParams: sp,
}

reply, err := disperserClient.DisperseBlob(ctxTimeout, request)
Expand All @@ -97,7 +99,7 @@ func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorumI
return blobStatus, reply.GetRequestId(), nil
}

func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []byte, quorumID, quorumThreshold, adversityThreshold uint32) (*disperser.BlobStatus, []byte, error) {
func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []byte, securityParams []*core.SecurityParam) (*disperser.BlobStatus, []byte, error) {

addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)

Expand All @@ -118,16 +120,19 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []
return nil, nil, fmt.Errorf("frror while calling DisperseBlobAuthenticated: %v", err)
}

sp := make([]*disperser_rpc.SecurityParams, len(securityParams))
for i, s := range securityParams {
sp[i] = &disperser_rpc.SecurityParams{
QuorumId: uint32(s.QuorumID),
QuorumThreshold: uint32(s.QuorumThreshold),
AdversaryThreshold: uint32(s.AdversaryThreshold),
}
}

request := &disperser_rpc.DisperseBlobRequest{
Data: data,
SecurityParams: []*disperser_rpc.SecurityParams{
{
QuorumId: quorumID,
QuorumThreshold: quorumThreshold,
AdversaryThreshold: adversityThreshold,
},
},
AccountId: c.signer.GetAccountID(),
Data: data,
SecurityParams: sp,
AccountId: c.signer.GetAccountID(),
}

// Send the initial request
Expand Down
9 changes: 5 additions & 4 deletions clients/mock/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/clients"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/stretchr/testify/mock"
)
Expand All @@ -19,8 +20,8 @@ func NewMockDisperserClient() *MockDisperserClient {
return &MockDisperserClient{}
}

func (c *MockDisperserClient) DisperseBlobAuthenticated(ctx context.Context, data []byte, quorumID, quorumThreshold, adversityThreshold uint32) (*disperser.BlobStatus, []byte, error) {
args := c.Called(data, quorumID, quorumThreshold, adversityThreshold)
func (c *MockDisperserClient) DisperseBlobAuthenticated(ctx context.Context, data []byte, securityParams []*core.SecurityParam) (*disperser.BlobStatus, []byte, error) {
args := c.Called(data, securityParams)
var status *disperser.BlobStatus
if args.Get(0) != nil {
status = (args.Get(0)).(*disperser.BlobStatus)
Expand All @@ -36,8 +37,8 @@ func (c *MockDisperserClient) DisperseBlobAuthenticated(ctx context.Context, dat
return status, key, err
}

func (c *MockDisperserClient) DisperseBlob(ctx context.Context, data []byte, quorumID, quorumThreshold, adversityThreshold uint32) (*disperser.BlobStatus, []byte, error) {
args := c.Called(data, quorumID, quorumThreshold, adversityThreshold)
func (c *MockDisperserClient) DisperseBlob(ctx context.Context, data []byte, securityParams []*core.SecurityParam) (*disperser.BlobStatus, []byte, error) {
args := c.Called(data, securityParams)
var status *disperser.BlobStatus
if args.Get(0) != nil {
status = (args.Get(0)).(*disperser.BlobStatus)
Expand Down
2 changes: 1 addition & 1 deletion clients/tests/retrieval_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func setup(t *testing.T) {
}

func mustMakeOpertatorPubKeysPair(t *testing.T) *coreindexer.OperatorPubKeys {
operators := make(map[[32]byte]coreindexer.OperatorPubKeysPair, len(operatorState.Operators))
operators := make(map[core.OperatorID]coreindexer.OperatorPubKeysPair, len(operatorState.Operators))
for operatorId := range operatorState.Operators[0] {
keyPair, err := core.GenRandomBlsKeys()
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,22 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str
return response.Items, nil
}

// QueryIndexCount returns the count of the items in the index that match the given key
func (c *Client) QueryIndexCount(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues) (int32, error) {
response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String(indexName),
KeyConditionExpression: aws.String(keyCondition),
ExpressionAttributeValues: expAttributeValues,
Select: types.SelectCount,
})
if err != nil {
return 0, err
}

return response.Count, nil
}

// QueryIndexWithPagination returns all items in the index that match the given key
// Results are limited to the given limit and the pagination token is returned
// When limit is is 0, all items are returned
Expand Down
66 changes: 64 additions & 2 deletions common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,59 @@ func TestQueryIndex(t *testing.T) {
assert.Equal(t, len(queryResult), 30)
}

func TestQueryIndexCount(t *testing.T) {
tableName := "ProcessingQueryIndexCount"
createTable(t, tableName)
indexName := "StatusIndex"

ctx := context.Background()
numItemsProcessing := 10
items1 := make([]commondynamodb.Item, numItemsProcessing)
for i := 0; i < numItemsProcessing; i += 1 {
items1[i] = commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i)},
"BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)},
"BlobSize": &types.AttributeValueMemberN{Value: "123"},
"BlobStatus": &types.AttributeValueMemberN{Value: "0"},
"RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().Unix(), 10)},
}
}

numItemsConfirmed := 20
items2 := make([]commondynamodb.Item, numItemsConfirmed)
for i := 0; i < numItemsConfirmed; i += 1 {
items2[i] = commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i+numItemsProcessing)},
"BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i+numItemsProcessing)},
"BlobSize": &types.AttributeValueMemberN{Value: "123"},
"BlobStatus": &types.AttributeValueMemberN{Value: "1"},
"RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().Unix(), 10)},
}
}

unprocessed, err := dynamoClient.PutItems(ctx, tableName, items1)
assert.NoError(t, err)
assert.Len(t, unprocessed, 0)

unprocessed, err = dynamoClient.PutItems(ctx, tableName, items2)
assert.NoError(t, err)
assert.Len(t, unprocessed, 0)

count, err := dynamoClient.QueryIndexCount(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}})
assert.NoError(t, err)
assert.Equal(t, int(count), 10)

count, err = dynamoClient.QueryIndexCount(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "1",
}})
assert.NoError(t, err)
assert.Equal(t, int(count), 20)
}

func TestQueryIndexPaginationSingleItem(t *testing.T) {
tableName := "ProcessingWithPaginationSingleItem"
createTable(t, tableName)
Expand Down Expand Up @@ -349,7 +402,7 @@ func TestQueryIndexPaginationItemNoLimit(t *testing.T) {
ctx := context.Background()
numItems := 30
for i := 0; i < numItems; i += 1 {
requestedAt := time.Now().Add(-time.Duration(i) * time.Second).Unix()
requestedAt := time.Now().Add(-time.Duration(3*i) * time.Second).Unix()

// Create new item
item := commondynamodb.Item{
Expand Down Expand Up @@ -409,7 +462,16 @@ func TestQueryIndexPagination(t *testing.T) {
ctx := context.Background()
numItems := 30
for i := 0; i < numItems; i += 1 {
requestedAt := time.Now().Add(-time.Duration(i) * time.Second).Unix()
// Noticed same timestamp for multiple items which resulted in key28
// being returned when 10 items were queried as first item,hence multiplying
// by random number 3 here to avoid such a situation
// requestedAt: 1705040877
// metadataKey: key28
// BlobKey: blob28
// requestedAt: 1705040877
// metadataKey: key29
// BlobKey: blob29
requestedAt := time.Now().Add(-time.Duration(3*i) * time.Second).Unix()

// Create new item
item := commondynamodb.Item{
Expand Down
4 changes: 2 additions & 2 deletions common/geth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (c *EthClient) EnsureTransactionEvaled(ctx context.Context, tx *types.Trans
c.Logger.Error("Transaction Failed", "tag", tag, "txHash", tx.Hash().Hex(), "status", receipt.Status, "GasUsed", receipt.GasUsed)
return nil, ErrTransactionFailed
}
c.Logger.Trace("successfully submitted transaction", "txHash", tx.Hash().Hex(), "tag", tag, "gasUsed", receipt.GasUsed)
c.Logger.Trace("transaction confirmed", "txHash", tx.Hash().Hex(), "tag", tag, "gasUsed", receipt.GasUsed)
return receipt, nil
}

Expand Down Expand Up @@ -239,7 +239,7 @@ func (c *EthClient) waitMined(ctx context.Context, tx *types.Transaction) (*type
}

if errors.Is(err, ethereum.NotFound) {
c.Logger.Trace("Transaction not yet mined")
c.Logger.Trace("Transaction not yet mined", "txHash", tx.Hash().Hex())
} else if err != nil {
c.Logger.Trace("Receipt retrieval failed", "err", err)
}
Expand Down
2 changes: 1 addition & 1 deletion common/geth/instrumented_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ func instrumentFunction[T any](

// Not sure why this method is not exposed in the ethclient itself...
// but it is needed to comply with the rpc metrics defined in avs-node spec
// https://eigen.nethermind.io/docs/metrics/metrics-prom-spec
// https://eigen.nethermind.io/docs/spec/metrics/metrics-prom-spec
func getClientAndVersion(client *EthClient) string {
var clientVersion string
err := client.Client.Client().Call(&clientVersion, "web3_clientVersion")
Expand Down
7 changes: 5 additions & 2 deletions common/mock/ethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,11 @@ func (mock *MockEthClient) GetLatestGasCaps(ctx context.Context) (gasTipCap, gas

func (mock *MockEthClient) UpdateGas(ctx context.Context, tx *types.Transaction, value, gasTipCap, gasFeeCap *big.Int) (*types.Transaction, error) {
args := mock.Called()
result := args.Get(0)
return result.(*types.Transaction), args.Error(1)
var newTx *types.Transaction
if args.Get(0) != nil {
newTx = args.Get(0).(*types.Transaction)
}
return newTx, args.Error(1)
}

func (mock *MockEthClient) EstimateGasPriceAndLimitAndSendTx(ctx context.Context, tx *types.Transaction, tag string, value *big.Int) (*types.Receipt, error) {
Expand Down
Loading

0 comments on commit 0efdbf0

Please sign in to comment.