diff --git a/churner/churner.go b/churner/churner.go index d11ae7147c..0d6d7ace9c 100644 --- a/churner/churner.go +++ b/churner/churner.go @@ -47,7 +47,7 @@ type churner struct { mu sync.Mutex Indexer thegraph.IndexedChainState Transactor core.Transactor - QuorumCount uint16 + QuorumCount uint8 privateKey *ecdsa.PrivateKey logger common.Logger diff --git a/churner/server_test.go b/churner/server_test.go index 377b7ddc18..3222f35f93 100644 --- a/churner/server_test.go +++ b/churner/server_test.go @@ -118,7 +118,7 @@ func setupMockTransactor() { transactorMock.On("OperatorIDToAddress").Return(operatorAddr, nil) transactorMock.On("GetCurrentQuorumBitmapByOperatorId").Return(big.NewInt(2), nil) transactorMock.On("GetCurrentBlockNumber").Return(uint32(2), nil) - transactorMock.On("GetQuorumCount").Return(uint16(1), nil) + transactorMock.On("GetQuorumCount").Return(uint8(1), nil) transactorMock.On("GetOperatorStakesForQuorums").Return(dacore.OperatorStakes{ 0: { 0: { diff --git a/churner/tests/churner_test.go b/churner/tests/churner_test.go index c69dd749a4..1f30206718 100644 --- a/churner/tests/churner_test.go +++ b/churner/tests/churner_test.go @@ -2,9 +2,11 @@ package test import ( "context" + "crypto/rand" "flag" "fmt" "log" + "math/big" "os" "testing" @@ -102,7 +104,20 @@ func TestChurner(t *testing.T) { keyPair, err := dacore.GenRandomBlsKeys() assert.NoError(t, err) - err = operatorTransactor.RegisterBLSPublicKey(ctx, keyPair) + quorumIds_ := make([]uint8, len(quorumIds)) + for i, q := range quorumIds { + quorumIds_[i] = uint8(q) + } + + operatorSalt := [32]byte{} + _, err = rand.Read(operatorSalt[:]) + assert.NoError(t, err) + + expiry := big.NewInt(1000) + privKey, err := crypto.GenerateKey() + assert.NoError(t, err) + + err = operatorTransactor.RegisterOperator(ctx, keyPair, "socket", quorumIds_, privKey, operatorSalt, expiry) assert.NoError(t, err) server := newTestServer(t) diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 783ee8a959..3888d4b17e 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -38,7 +38,7 @@ type DispersalServer struct { blobStore disperser.BlobStore tx core.Transactor - quorumCount uint16 + quorumCount uint8 rateConfig RateConfig ratelimiter common.RateLimiter @@ -177,13 +177,13 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob) (*p } seenQuorums[param.QuorumID] = struct{}{} - if uint16(param.QuorumID) >= s.quorumCount { + if param.QuorumID >= s.quorumCount { err := s.updateQuorumCount(ctx) if err != nil { return nil, fmt.Errorf("failed to get onchain quorum count: %w", err) } - if uint16(param.QuorumID) >= s.quorumCount { + if param.QuorumID >= s.quorumCount { return nil, fmt.Errorf("invalid request: the quorum_id must be in range [0, %d], but found %d", s.quorumCount-1, param.QuorumID) } } diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 3df211b0bb..190fa1708a 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -310,8 +310,8 @@ func TestRatelimit(t *testing.T) { }) assert.ErrorContains(t, err, "account throughput limit") - // Try with non-allowlisted IP. Should fail with account blob limit because blob rate (3 blobs/s) X bucket size (3s) is smaller than 10 blobs. - for i := 0; i < 10; i++ { + // Try with non-allowlisted IP. Should fail with account blob limit because blob rate (3 blobs/s) X bucket size (3s) is smaller than 20 blobs. + for i := 0; i < 20; i++ { _, err = dispersalServer.DisperseBlob(ctx, &pb.DisperseBlobRequest{ Data: data1KiB, SecurityParams: []*pb.SecurityParams{ @@ -463,7 +463,7 @@ func newTestServer(m *testing.M) *apiserver.DispersalServer { queue = blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger) tx := &mock.MockTransactor{} tx.On("GetCurrentBlockNumber").Return(uint32(100), nil) - tx.On("GetQuorumCount").Return(uint16(2), nil) + tx.On("GetQuorumCount").Return(uint8(2), nil) return apiserver.NewDispersalServer(disperser.ServerConfig{ GrpcPort: "51001", diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index ffd2eed7da..39353d6422 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -486,21 +486,21 @@ func (b *Batcher) parseBatchIDFromReceipt(ctx context.Context, txReceipt *types. if log.Topics[0] == common.BatchConfirmedEventSigHash { smAbi, err := abi.JSON(bytes.NewReader(common.ServiceManagerAbi)) if err != nil { - return 0, err + return 0, fmt.Errorf("failed to parse ServiceManager ABI: %w", err) } eventAbi, err := smAbi.EventByID(common.BatchConfirmedEventSigHash) if err != nil { - return 0, err + return 0, fmt.Errorf("failed to parse BatchConfirmed event ABI: %w", err) } unpackedData, err := eventAbi.Inputs.Unpack(log.Data) if err != nil { - return 0, err + return 0, fmt.Errorf("failed to unpack BatchConfirmed log data: %w", err) } - // There should be exactly two inputs in the data field, batchId and fee. - // ref: https://github.com/Layr-Labs/eigenda/blob/master/contracts/src/interfaces/IEigenDAServiceManager.sol#L20 - if len(unpackedData) != 2 { - return 0, fmt.Errorf("BatchConfirmed log should contain exactly 2 inputs. Found %d", len(unpackedData)) + // There should be exactly one input in the data field, batchId. + // Labs/eigenda/blob/master/contracts/src/interfaces/IEigenDAServiceManager.sol#L17 + if len(unpackedData) != 1 { + return 0, fmt.Errorf("BatchConfirmed log should contain exactly 1 inputs. Found %d", len(unpackedData)) } return unpackedData[0].(uint32), nil } diff --git a/node/node.go b/node/node.go index 23ead4dd91..3f8fd3036a 100644 --- a/node/node.go +++ b/node/node.go @@ -12,6 +12,8 @@ import ( "github.com/Layr-Labs/eigenda/common/pubip" gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/prometheus/client_golang/prometheus" "github.com/Layr-Labs/eigenda/api/grpc/node" "github.com/Layr-Labs/eigenda/common" @@ -21,6 +23,7 @@ import ( "github.com/Layr-Labs/eigenda/core/eth" "github.com/Layr-Labs/eigenda/core/indexer" "github.com/Layr-Labs/eigensdk-go/chainio/constructor" + "github.com/Layr-Labs/eigensdk-go/metrics" "github.com/Layr-Labs/eigensdk-go/metrics/collectors/economic" rpccalls "github.com/Layr-Labs/eigensdk-go/metrics/collectors/rpc_calls" "github.com/Layr-Labs/eigensdk-go/nodeapi" @@ -62,12 +65,16 @@ type Node struct { // NewNode creates a new Node with the provided config. func NewNode(config *Config, pubIPProvider pubip.Provider, logger common.Logger) (*Node, error) { // Setup metrics - sdkClients, err := buildSdkClients(config, logger) - if err != nil { - return nil, err - } - metrics := NewMetrics(sdkClients.Metrics, sdkClients.PrometheusRegistry, logger, ":"+config.MetricsPort) - rpcCallsCollector := rpccalls.NewCollector(AppName, sdkClients.PrometheusRegistry) + // sdkClients, err := buildSdkClients(config, logger) + // if err != nil { + // return nil, err + // } + + promReg := prometheus.NewRegistry() + eigenMetrics := metrics.NewEigenMetrics(AppName, ":"+config.MetricsPort, promReg, logger) + + metrics := NewMetrics(eigenMetrics, promReg, logger, ":"+config.MetricsPort) + rpcCallsCollector := rpccalls.NewCollector(AppName, promReg) // Generate BLS keys keyPair, err := core.MakeKeyPairFromString(config.PrivateBls) @@ -183,14 +190,19 @@ func (n *Node) Start(ctx context.Context) error { n.Config.ID, "hostname", n.Config.Hostname, "dispersalPort", n.Config.DispersalPort, "retrievalPort", n.Config.RetrievalPort, "churnerUrl", n.Config.ChurnerUrl, "quorumIds", n.Config.QuorumIDList) socket := string(core.MakeOperatorSocket(n.Config.Hostname, n.Config.DispersalPort, n.Config.RetrievalPort)) + privateKey, err := crypto.HexToECDSA(n.Config.EthClientConfig.PrivateKeyString) + if err != nil { + return fmt.Errorf("NewClient: cannot parse private key: %w", err) + } operator := &Operator{ Socket: socket, Timeout: 10 * time.Second, + PrivKey: privateKey, KeyPair: n.KeyPair, OperatorId: n.Config.ID, QuorumIDs: n.Config.QuorumIDList, } - err := RegisterOperator(ctx, operator, n.Transactor, n.Config.ChurnerUrl, n.Config.UseSecureGrpc, n.Logger) + err = RegisterOperator(ctx, operator, n.Transactor, n.Config.ChurnerUrl, n.Config.UseSecureGrpc, n.Logger) if err != nil { return fmt.Errorf("failed to register the operator: %w", err) } diff --git a/node/operator.go b/node/operator.go index 456dabdbe1..108bef0159 100644 --- a/node/operator.go +++ b/node/operator.go @@ -2,9 +2,11 @@ package node import ( "context" + "crypto/ecdsa" "crypto/tls" "errors" "fmt" + "math/big" "time" grpcchurner "github.com/Layr-Labs/eigenda/api/grpc/churner" @@ -20,6 +22,7 @@ import ( type Operator struct { Socket string Timeout time.Duration + PrivKey *ecdsa.PrivateKey KeyPair *core.KeyPair OperatorId core.OperatorID QuorumIDs []core.QuorumID @@ -37,12 +40,6 @@ func RegisterOperator(ctx context.Context, operator *Operator, transactor core.T return nil } - // if the operator is not registered, we may need to register the BLSPublicKey - err = transactor.RegisterBLSPublicKey(ctx, operator.KeyPair) - if err != nil { - return fmt.Errorf("failed to register the nodes bls public key: %w", err) - } - logger.Info("Quorums to register for", "quorums", operator.QuorumIDs) if len(operator.QuorumIDs) == 0 { @@ -72,6 +69,15 @@ func RegisterOperator(ctx context.Context, operator *Operator, transactor core.T logger.Info("Should call churner", "shouldCallChurner", shouldCallChurner) + // Generate salt and expiry + + privateKeyBytes := []byte(operator.KeyPair.PrivKey.String()) + salt := [32]byte{} + copy(salt[:], crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), operator.QuorumIDs[:], privateKeyBytes)) + + // Get the current block number + expiry := big.NewInt((time.Now().Add(10 * time.Minute)).Unix()) + // if we should call the churner, call it if shouldCallChurner { churnReply, err := requestChurnApproval(ctx, operator, churnerUrl, useSecureGrpc, logger) @@ -79,10 +85,10 @@ func RegisterOperator(ctx context.Context, operator *Operator, transactor core.T return fmt.Errorf("failed to request churn approval: %w", err) } - return transactor.RegisterOperatorWithChurn(ctx, operator.KeyPair.PubKey, operator.Socket, operator.QuorumIDs, churnReply) + return transactor.RegisterOperatorWithChurn(ctx, operator.KeyPair, operator.Socket, operator.QuorumIDs, operator.PrivKey, salt, expiry, churnReply) } else { // other wise just register normally - return transactor.RegisterOperator(ctx, operator.KeyPair.PubKey, operator.Socket, operator.QuorumIDs) + return transactor.RegisterOperator(ctx, operator.KeyPair, operator.Socket, operator.QuorumIDs, operator.PrivKey, salt, expiry) } } diff --git a/node/plugin/cmd/main.go b/node/plugin/cmd/main.go index e3fbe59edd..a85c4382ab 100644 --- a/node/plugin/cmd/main.go +++ b/node/plugin/cmd/main.go @@ -118,6 +118,7 @@ func pluginOps(ctx *cli.Context) { operator := &node.Operator{ Socket: socket, Timeout: 10 * time.Second, + PrivKey: sk.PrivateKey, KeyPair: keyPair, OperatorId: keyPair.GetPubKeyG1().GetOperatorID(), QuorumIDs: config.QuorumIDList,