From c78de0093d926daffd25335fb26141bdc04b0533 Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Thu, 7 Nov 2024 09:35:27 -0800 Subject: [PATCH] v2 dispatcher --- core/serialization.go | 21 + core/v2/types.go | 3 +- disperser/batcher/batcher.go | 10 +- .../v2/blobstore/dynamo_metadata_store.go | 100 +++- .../blobstore/dynamo_metadata_store_test.go | 44 ++ disperser/controller/dispatcher.go | 432 ++++++++++++++++++ disperser/controller/dispatcher_test.go | 313 +++++++++++++ disperser/controller/encoding_manager.go | 2 +- disperser/controller/encoding_manager_test.go | 10 +- .../controller/mock_node_client_manager.go | 19 + disperser/controller/node_client_manager.go | 60 +++ .../controller/node_client_manager_test.go | 43 ++ 12 files changed, 1040 insertions(+), 17 deletions(-) create mode 100644 disperser/controller/dispatcher.go create mode 100644 disperser/controller/dispatcher_test.go create mode 100644 disperser/controller/mock_node_client_manager.go create mode 100644 disperser/controller/node_client_manager.go create mode 100644 disperser/controller/node_client_manager_test.go diff --git a/core/serialization.go b/core/serialization.go index a0159efabd..b42015d069 100644 --- a/core/serialization.go +++ b/core/serialization.go @@ -393,6 +393,27 @@ func (h *BlobHeader) Deserialize(data []byte) (*BlobHeader, error) { return h, err } +func SerializeMerkleProof(proof *merkletree.Proof) []byte { + proofBytes := make([]byte, 0) + for _, hash := range proof.Hashes { + proofBytes = append(proofBytes, hash[:]...) + } + return proofBytes +} + +func DeserializeMerkleProof(data []byte) (*merkletree.Proof, error) { + proof := &merkletree.Proof{} + if len(data)%32 != 0 { + return nil, fmt.Errorf("invalid proof length") + } + for i := 0; i < len(data); i += 32 { + var hash [32]byte + copy(hash[:], data[i:i+32]) + proof.Hashes = append(proof.Hashes, hash[:]) + } + return proof, nil +} + func encode(obj any) ([]byte, error) { var buf bytes.Buffer enc := gob.NewEncoder(&buf) diff --git a/core/v2/types.go b/core/v2/types.go index e2d2ac5813..f77162e81f 100644 --- a/core/v2/types.go +++ b/core/v2/types.go @@ -19,7 +19,7 @@ var ( // TODO(mooselumph): Put these parameters on chain and add on-chain checks to ensure that the number of operators does not // conflict with the existing on-chain limits ParametersMap = map[BlobVersion]BlobVersionParameters{ - 0: {CodingRate: 8, ReconstructionThreshold: 0.22, NumChunks: 8192}, + 0: {CodingRate: 8, ReconstructionThreshold: 0.22, ConfirmationThreshold: 55, NumChunks: 8192}, } ) @@ -228,6 +228,7 @@ type BlobVerificationInfo struct { type BlobVersionParameters struct { CodingRate uint32 ReconstructionThreshold float64 + ConfirmationThreshold uint8 NumChunks uint32 } diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index f5fef3d38f..d970eb8516 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -302,7 +302,7 @@ func (b *Batcher) updateConfirmationInfo( blobsToRetry = append(blobsToRetry, batchData.blobs[blobIndex]) continue } - proof = serializeProof(merkleProof) + proof = core.SerializeMerkleProof(merkleProof) } confirmationInfo := &disperser.ConfirmationInfo{ @@ -563,14 +563,6 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { return nil } -func serializeProof(proof *merkletree.Proof) []byte { - proofBytes := make([]byte, 0) - for _, hash := range proof.Hashes { - proofBytes = append(proofBytes, hash[:]...) - } - return proofBytes -} - func (b *Batcher) parseBatchIDFromReceipt(txReceipt *types.Receipt) (uint32, error) { if len(txReceipt.Logs) == 0 { return 0, errors.New("failed to get transaction receipt with logs") diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store.go b/disperser/common/v2/blobstore/dynamo_metadata_store.go index 621462d78c..cbddc7cfb7 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store.go @@ -156,7 +156,7 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status Value: strconv.Itoa(int(status)), }, ":updatedAt": &types.AttributeValueMemberN{ - Value: strconv.FormatInt(time.Now().Unix(), 10), + Value: strconv.FormatInt(int64(lastUpdatedAt), 10), }}) if err != nil { return nil, err @@ -422,6 +422,76 @@ func (s *BlobMetadataStore) GetAttestation(ctx context.Context, batchHeaderHash return attestation, nil } +func (s *BlobMetadataStore) PutBlobVerificationInfo(ctx context.Context, verificationInfo *corev2.BlobVerificationInfo) error { + item, err := MarshalBlobVerificationInfo(verificationInfo) + if err != nil { + return err + } + + err = s.dynamoDBClient.PutItemWithCondition(ctx, s.tableName, item, "attribute_not_exists(PK) AND attribute_not_exists(SK)", nil, nil) + if errors.Is(err, commondynamodb.ErrConditionFailed) { + return common.ErrAlreadyExists + } + + return err +} + +func (s *BlobMetadataStore) GetBlobVerificationInfo(ctx context.Context, blobKey corev2.BlobKey, batchHeaderHash [32]byte) (*corev2.BlobVerificationInfo, error) { + bhh := hex.EncodeToString(batchHeaderHash[:]) + item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: blobKeyPrefix + blobKey.Hex(), + }, + "SK": &types.AttributeValueMemberS{ + Value: batchHeaderKeyPrefix + bhh, + }, + }) + + if err != nil { + return nil, err + } + + if item == nil { + return nil, fmt.Errorf("%w: verification info not found for key %s", common.ErrMetadataNotFound, blobKey.Hex()) + } + + info, err := UnmarshalBlobVerificationInfo(item) + if err != nil { + return nil, err + } + + return info, nil +} + +func (s *BlobMetadataStore) GetBlobVerificationInfos(ctx context.Context, blobKey corev2.BlobKey) ([]*corev2.BlobVerificationInfo, error) { + items, err := s.dynamoDBClient.Query(ctx, s.tableName, "PK = :pk AND begins_with(SK, :prefix)", commondynamodb.ExpressionValues{ + ":pk": &types.AttributeValueMemberS{ + Value: blobKeyPrefix + blobKey.Hex(), + }, + ":prefix": &types.AttributeValueMemberS{ + Value: batchHeaderKeyPrefix, + }, + }) + + if err != nil { + return nil, err + } + + if len(items) == 0 { + return nil, fmt.Errorf("%w: verification info not found for key %s", common.ErrMetadataNotFound, blobKey.Hex()) + } + + responses := make([]*corev2.BlobVerificationInfo, len(items)) + for i, item := range items { + responses[i], err = UnmarshalBlobVerificationInfo(item) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal verification info: %w", err) + } + } + + return responses, nil +} + func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { return &dynamodb.CreateTableInput{ AttributeDefinitions: []types.AttributeDefinition{ @@ -749,6 +819,34 @@ func UnmarshalBatchHeader(item commondynamodb.Item) (*corev2.BatchHeader, error) return &header, nil } +func MarshalBlobVerificationInfo(verificationInfo *corev2.BlobVerificationInfo) (commondynamodb.Item, error) { + fields, err := attributevalue.MarshalMap(verificationInfo) + if err != nil { + return nil, fmt.Errorf("failed to marshal blob verification info: %w", err) + } + + bhh, err := verificationInfo.BatchHeader.Hash() + if err != nil { + return nil, fmt.Errorf("failed to hash batch header: %w", err) + } + hashstr := hex.EncodeToString(bhh[:]) + + fields["PK"] = &types.AttributeValueMemberS{Value: blobKeyPrefix + verificationInfo.BlobKey.Hex()} + fields["SK"] = &types.AttributeValueMemberS{Value: batchHeaderKeyPrefix + hashstr} + + return fields, nil +} + +func UnmarshalBlobVerificationInfo(item commondynamodb.Item) (*corev2.BlobVerificationInfo, error) { + verificationInfo := corev2.BlobVerificationInfo{} + err := attributevalue.UnmarshalMap(item, &verificationInfo) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal blob verification info: %w", err) + } + + return &verificationInfo, nil +} + func MarshalAttestation(attestation *corev2.Attestation) (commondynamodb.Item, error) { fields, err := attributevalue.MarshalMap(attestation) if err != nil { diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go index 89f5585b8f..e81b923c25 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go @@ -77,6 +77,11 @@ func TestBlobMetadataStoreOperations(t *testing.T) { assert.NoError(t, err) assert.Len(t, queued, 1) assert.Equal(t, metadata1, queued[0]) + // query to get newer blobs should result in 0 results + queued, err = blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Queued, metadata1.UpdatedAt+100) + assert.NoError(t, err) + assert.Len(t, queued, 0) + certified, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Certified, 0) assert.NoError(t, err) assert.Len(t, certified, 1) @@ -150,6 +155,45 @@ func TestBlobMetadataStoreCerts(t *testing.T) { err = blobMetadataStore.PutBlobCertificate(ctx, blobCert1, fragmentInfo) assert.ErrorIs(t, err, common.ErrAlreadyExists) + // get multiple certs + numCerts := 100 + keys := make([]corev2.BlobKey, numCerts) + for i := 0; i < numCerts; i++ { + blobCert := &corev2.BlobCertificate{ + BlobHeader: &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{0}, + BlobCommitments: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: uint32(i), + CumulativePayment: big.NewInt(532), + }, + Signature: []byte("signature"), + }, + RelayKeys: []corev2.RelayKey{0}, + } + blobKey, err := blobCert.BlobHeader.BlobKey() + assert.NoError(t, err) + keys[i] = blobKey + err = blobMetadataStore.PutBlobCertificate(ctx, blobCert, fragmentInfo) + assert.NoError(t, err) + } + + certs, fragmentInfos, err := blobMetadataStore.GetBlobCertificates(ctx, keys) + assert.NoError(t, err) + assert.Len(t, certs, numCerts) + assert.Len(t, fragmentInfos, numCerts) + binIndexes := make(map[uint32]struct{}) + for i := 0; i < numCerts; i++ { + assert.Equal(t, fragmentInfos[i], fragmentInfo) + binIndexes[certs[i].BlobHeader.PaymentMetadata.BinIndex] = struct{}{} + } + assert.Len(t, binIndexes, numCerts) + for i := 0; i < numCerts; i++ { + assert.Contains(t, binIndexes, uint32(i)) + } + deleteItems(t, []commondynamodb.Key{ { "PK": &types.AttributeValueMemberS{Value: "BlobKey#" + blobKey.Hex()}, diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go new file mode 100644 index 0000000000..d4737831e8 --- /dev/null +++ b/disperser/controller/dispatcher.go @@ -0,0 +1,432 @@ +package controller + +import ( + "context" + "errors" + "fmt" + "math" + "time" + + "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" + "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" + "github.com/Layr-Labs/eigensdk-go/logging" + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/wealdtech/go-merkletree/v2" + "github.com/wealdtech/go-merkletree/v2/keccak256" +) + +var errNoBlobsToDispatch = errors.New("no blobs to dispatch") + +type DispatcherConfig struct { + PullInterval time.Duration + + FinalizationBlockDelay uint64 + NodeRequestTimeout time.Duration + NumRequestRetries int + NumConnectionsToNodes int +} + +type Dispatcher struct { + DispatcherConfig + + blobMetadataStore *blobstore.BlobMetadataStore + pool common.WorkerPool + chainState core.IndexedChainState + aggregator core.SignatureAggregator + nodeClientManager NodeClientManager + logger logging.Logger + + lastUpdatedAt uint64 +} + +type batchData struct { + Batch *corev2.Batch + BatchHeaderHash [32]byte + BlobKeys []corev2.BlobKey + OperatorState *core.IndexedOperatorState +} + +func NewDispatcher( + config DispatcherConfig, + blobMetadataStore *blobstore.BlobMetadataStore, + pool common.WorkerPool, + chainState core.IndexedChainState, + aggregator core.SignatureAggregator, + nodeClientManager NodeClientManager, + logger logging.Logger, +) (*Dispatcher, error) { + return &Dispatcher{ + DispatcherConfig: config, + + blobMetadataStore: blobMetadataStore, + pool: pool, + chainState: chainState, + aggregator: aggregator, + nodeClientManager: nodeClientManager, + logger: logger.With("component", "Dispatcher"), + + lastUpdatedAt: 0, + }, nil +} + +func (d *Dispatcher) Start(ctx context.Context) error { + go func() { + ticker := time.NewTicker(d.PullInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + sigChan, batchData, err := d.HandleBatch(ctx) + if err != nil { + if errors.Is(err, errNoBlobsToDispatch) { + d.logger.Warn("no blobs to dispatch") + } else { + d.logger.Error("failed to process a batch", "err", err) + } + continue + } + + go func() { + err := d.HandleSignatures(ctx, batchData, sigChan) + if err != nil { + d.logger.Error("failed to handle signatures", "err", err) + } + }() + } + } + }() + + return nil + +} + +func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage, *batchData, error) { + currentBlockNumber, err := d.chainState.GetCurrentBlockNumber() + if err != nil { + return nil, nil, fmt.Errorf("failed to get current block number: %w", err) + } + referenceBlockNumber := uint64(currentBlockNumber) - d.FinalizationBlockDelay + + // Get a batch of blobs to dispatch + // This also writes a batch header and blob verification info for each blob in metadata store + batchData, err := d.NewBatch(ctx, referenceBlockNumber) + if err != nil { + return nil, nil, err + } + + batch := batchData.Batch + state := batchData.OperatorState + sigChan := make(chan core.SigningMessage, len(state.IndexedOperators)) + for opID, op := range state.IndexedOperators { + opID := opID + op := op + client, err := d.nodeClientManager.GetClient(opID, op.Socket) + if err != nil { + d.logger.Error("failed to get node client", "operator", opID, "err", err) + continue + } + + d.pool.Submit(func() { + + req := &corev2.DispersalRequest{ + OperatorID: opID, + // TODO: get OperatorAddress + OperatorAddress: gethcommon.Address{}, + Socket: op.Socket, + DispersedAt: uint64(time.Now().UnixNano()), + BatchHeader: *batch.BatchHeader, + } + err := d.blobMetadataStore.PutDispersalRequest(ctx, req) + if err != nil { + d.logger.Error("failed to put dispersal request", "err", err) + return + } + + for i := 0; i < d.NumRequestRetries+1; i++ { + sig, err := d.sendChunks(ctx, client, batch) + if err == nil { + storeErr := d.blobMetadataStore.PutDispersalResponse(ctx, &corev2.DispersalResponse{ + DispersalRequest: req, + RespondedAt: uint64(time.Now().UnixNano()), + Signature: sig.Bytes(), + Error: "", + }) + if storeErr != nil { + d.logger.Error("failed to put dispersal response", "err", storeErr) + } + + sigChan <- core.SigningMessage{ + Signature: sig, + Operator: opID, + BatchHeaderHash: batchData.BatchHeaderHash, + AttestationLatencyMs: 0, // TODO: calculate latency + Err: nil, + } + + break + } + + d.logger.Warn("failed to send chunks", "operator", opID, "NumAttempts", i, "err", err) + time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second) // Wait before retrying + } + }) + } + + return sigChan, batchData, nil +} + +// HandleSignatures receives signatures from operators, validates, and aggregates them +func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, sigChan chan core.SigningMessage) error { + quorumAttestation, err := d.aggregator.ReceiveSignatures(ctx, batchData.OperatorState, batchData.BatchHeaderHash, sigChan) + if err != nil { + return fmt.Errorf("failed to receive and validate signatures: %w", err) + } + numPassed, passedQuorums := numBlobsAttestedByQuorum(quorumAttestation.QuorumResults, batchData.Batch.BlobCertificates) + if numPassed == 0 { + err = d.updateBatchStatus(ctx, batchData.BlobKeys, v2.Failed) + if err != nil { + d.logger.Error("failed to mark blobs as failed", "err", err) + } + return fmt.Errorf("no blobs attested by any quorum") + } + + nonEmptyQuorums := []core.QuorumID{} + for quorumID := range passedQuorums { + d.logger.Info("quorum successfully attested", "quorumID", quorumID) + nonEmptyQuorums = append(nonEmptyQuorums, quorumID) + } + aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, nonEmptyQuorums) + if err != nil { + return fmt.Errorf("failed to aggregate signatures: %w", err) + } + err = d.blobMetadataStore.PutAttestation(ctx, &corev2.Attestation{ + BatchHeader: batchData.Batch.BatchHeader, + AttestedAt: uint64(time.Now().UnixNano()), + NonSignerPubKeys: aggSig.NonSigners, + APKG2: aggSig.AggPubKey, + QuorumAPKs: aggSig.QuorumAggPubKeys, + Sigma: aggSig.AggSignature, + QuorumNumbers: nonEmptyQuorums, + }) + if err != nil { + return fmt.Errorf("failed to put attestation: %w", err) + } + + err = d.updateBatchStatus(ctx, batchData.BlobKeys, v2.Certified) + if err != nil { + return fmt.Errorf("failed to mark blobs as certified: %w", err) + } + + return nil +} + +// NewBatch creates a batch of blobs to dispatch +// Warning: This function is not thread-safe +func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) (*batchData, error) { + blobMetadatas, err := d.blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Encoded, d.lastUpdatedAt) + if err != nil { + return nil, fmt.Errorf("failed to get blob metadata by status: %w", err) + } + + if len(blobMetadatas) == 0 { + return nil, errNoBlobsToDispatch + } + + state, err := d.GetOperatorState(ctx, blobMetadatas, referenceBlockNumber) + if err != nil { + return nil, fmt.Errorf("failed to get operator state: %w", err) + } + + keys := make([]corev2.BlobKey, len(blobMetadatas)) + for i, metadata := range blobMetadatas { + if metadata == nil || metadata.BlobHeader == nil { + return nil, fmt.Errorf("invalid blob metadata") + } + blobKey, err := metadata.BlobHeader.BlobKey() + if err != nil { + return nil, fmt.Errorf("failed to get blob key: %w", err) + } + keys[i] = blobKey + if metadata.UpdatedAt > d.lastUpdatedAt { + d.lastUpdatedAt = metadata.UpdatedAt + } + } + + certs, _, err := d.blobMetadataStore.GetBlobCertificates(ctx, keys) + if err != nil { + return nil, fmt.Errorf("failed to get blob certificates: %w", err) + } + + if len(certs) != len(keys) { + return nil, fmt.Errorf("blob certificates not found for all blob keys") + } + + certsMap := make(map[corev2.BlobKey]*corev2.BlobCertificate, len(certs)) + for _, cert := range certs { + blobKey, err := cert.BlobHeader.BlobKey() + if err != nil { + return nil, fmt.Errorf("failed to get blob key: %w", err) + } + + certsMap[blobKey] = cert + } + + // Keep the order of certs the same as the order of keys + for i, key := range keys { + c, ok := certsMap[key] + if !ok { + return nil, fmt.Errorf("blob certificate not found for blob key %s", key.Hex()) + } + certs[i] = c + } + + batchHeader := &corev2.BatchHeader{ + BatchRoot: [32]byte{}, + ReferenceBlockNumber: referenceBlockNumber, + } + + tree, err := BuildMerkleTree(certs) + if err != nil { + return nil, fmt.Errorf("failed to build merkle tree: %w", err) + } + + copy(batchHeader.BatchRoot[:], tree.Root()) + + batchHeaderHash, err := batchHeader.Hash() + if err != nil { + return nil, fmt.Errorf("failed to hash batch header: %w", err) + } + + err = d.blobMetadataStore.PutBatchHeader(ctx, batchHeader) + if err != nil { + return nil, fmt.Errorf("failed to put batch header: %w", err) + } + + for i, cert := range certs { + if cert == nil || cert.BlobHeader == nil { + return nil, fmt.Errorf("invalid blob certificate") + } + blobKey, err := cert.BlobHeader.BlobKey() + if err != nil { + return nil, fmt.Errorf("failed to get blob key: %w", err) + } + + merkleProof, err := tree.GenerateProofWithIndex(uint64(i), 0) + if err != nil { + return nil, fmt.Errorf("failed to generate merkle proof: %w", err) + } + + verificationInfo := &corev2.BlobVerificationInfo{ + BatchHeader: batchHeader, + BlobKey: blobKey, + BlobIndex: uint32(i), + InclusionProof: core.SerializeMerkleProof(merkleProof), + } + + err = d.blobMetadataStore.PutBlobVerificationInfo(ctx, verificationInfo) + if err != nil { + return nil, fmt.Errorf("failed to put blob verification info: %w", err) + } + } + + return &batchData{ + Batch: &corev2.Batch{ + BatchHeader: batchHeader, + BlobCertificates: certs, + }, + BatchHeaderHash: batchHeaderHash, + BlobKeys: keys, + OperatorState: state, + }, nil +} + +// GetOperatorState returns the operator state for the given quorums at the given block number +func (d *Dispatcher) GetOperatorState(ctx context.Context, metadatas []*v2.BlobMetadata, blockNumber uint64) (*core.IndexedOperatorState, error) { + quorums := make(map[core.QuorumID]struct{}, 0) + for _, m := range metadatas { + for _, quorum := range m.BlobHeader.QuorumNumbers { + quorums[quorum] = struct{}{} + } + } + + quorumIds := make([]core.QuorumID, len(quorums)) + i := 0 + for id := range quorums { + quorumIds[i] = id + i++ + } + + // GetIndexedOperatorState should return state for valid quorums only + return d.chainState.GetIndexedOperatorState(ctx, uint(blockNumber), quorumIds) +} + +func (d *Dispatcher) sendChunks(ctx context.Context, client clients.NodeClientV2, batch *corev2.Batch) (*core.Signature, error) { + ctxWithTimeout, cancel := context.WithTimeout(ctx, d.NodeRequestTimeout) + defer cancel() + + sig, err := client.StoreChunks(ctxWithTimeout, batch) + if err != nil { + return nil, fmt.Errorf("failed to store chunks: %w", err) + } + + return sig, nil +} + +func (d *Dispatcher) updateBatchStatus(ctx context.Context, keys []corev2.BlobKey, status v2.BlobStatus) error { + for _, key := range keys { + err := d.blobMetadataStore.UpdateBlobStatus(ctx, key, status) + if err != nil { + d.logger.Error("failed to update blob status", "blobKey", key.Hex(), "status", status.String(), "err", err) + } + } + return nil +} + +func BuildMerkleTree(certs []*corev2.BlobCertificate) (*merkletree.MerkleTree, error) { + leafs := make([][]byte, len(certs)) + for i, cert := range certs { + leaf, err := cert.Hash() + if err != nil { + return nil, fmt.Errorf("failed to compute blob header hash: %w", err) + } + leafs[i] = leaf[:] + } + + tree, err := merkletree.NewTree(merkletree.WithData(leafs), merkletree.WithHashType(keccak256.New())) + if err != nil { + return nil, err + } + + return tree, nil +} + +// numBlobsAttestedByQuorum returns two values: +// 1. the number of blobs that have been successfully attested by all quorums +// 2. map[QuorumID]struct{} contains quorums that have been successfully attested by the quorum (has at least one blob attested in the quorum) +func numBlobsAttestedByQuorum(signedQuorums map[core.QuorumID]*core.QuorumResult, certs []*corev2.BlobCertificate) (int, map[core.QuorumID]struct{}) { + numPassed := 0 + quorums := make(map[core.QuorumID]struct{}) + for _, cert := range certs { + thisPassed := true + blobParams := corev2.ParametersMap[cert.BlobHeader.BlobVersion] + + for _, quorum := range cert.BlobHeader.QuorumNumbers { + if signedQuorums[quorum].PercentSigned < blobParams.ConfirmationThreshold { + thisPassed = false + } else { + quorums[quorum] = struct{}{} + } + } + if thisPassed { + numPassed++ + } + } + + return numPassed, quorums +} diff --git a/disperser/controller/dispatcher_test.go b/disperser/controller/dispatcher_test.go new file mode 100644 index 0000000000..a19350aa66 --- /dev/null +++ b/disperser/controller/dispatcher_test.go @@ -0,0 +1,313 @@ +package controller_test + +import ( + "context" + "crypto/rand" + "encoding/hex" + "math/big" + "testing" + "time" + + clientsmock "github.com/Layr-Labs/eigenda/api/clients/mock" + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core" + coremock "github.com/Layr-Labs/eigenda/core/mock" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" + "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" + "github.com/Layr-Labs/eigenda/disperser/controller" + "github.com/Layr-Labs/eigenda/encoding" + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/gammazero/workerpool" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/wealdtech/go-merkletree/v2" + "github.com/wealdtech/go-merkletree/v2/keccak256" +) + +var ( + opId0, _ = core.OperatorIDFromHex("e22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311") + opId1, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568312") + mockChainState, _ = coremock.NewChainDataMock(map[uint8]map[core.OperatorID]int{ + 0: { + opId0: 1, + opId1: 1, + }, + 1: { + opId0: 1, + opId1: 3, + }, + }) + finalizationBlockDelay = uint64(10) +) + +type dispatcherComponents struct { + Dispatcher *controller.Dispatcher + BlobMetadataStore *blobstore.BlobMetadataStore + Pool common.WorkerPool + ChainReader *coremock.MockWriter + ChainState *coremock.ChainDataMock + SigAggregator *core.StdSignatureAggregator + NodeClientManager *controller.MockClientManager +} + +func TestDispatcherHandleBatch(t *testing.T) { + components := newDispatcherComponents(t) + objs := setupBlobCerts(t, components.BlobMetadataStore, 2) + ctx := context.Background() + + // Get batch header hash to mock signatures + merkleTree, err := controller.BuildMerkleTree(objs.blobCerts) + require.NoError(t, err) + require.NotNil(t, merkleTree) + require.NotNil(t, merkleTree.Root()) + batchHeader := &corev2.BatchHeader{ + ReferenceBlockNumber: blockNumber - finalizationBlockDelay, + } + copy(batchHeader.BatchRoot[:], merkleTree.Root()) + bhh, err := batchHeader.Hash() + require.NoError(t, err) + + mockClient0 := clientsmock.NewNodeClientV2() + sig0 := mockChainState.KeyPairs[opId0].SignMessage(bhh) + mockClient0.On("StoreChunks", mock.Anything, mock.Anything).Return(sig0, nil) + components.NodeClientManager.On("GetClient", opId0, mock.Anything).Return(mockClient0, nil) + mockClient1 := clientsmock.NewNodeClientV2() + sig1 := mockChainState.KeyPairs[opId1].SignMessage(bhh) + mockClient1.On("StoreChunks", mock.Anything, mock.Anything).Return(sig1, nil) + components.NodeClientManager.On("GetClient", opId1, mock.Anything).Return(mockClient1, nil) + + sigChan, batchData, err := components.Dispatcher.HandleBatch(ctx) + require.NoError(t, err) + err = components.Dispatcher.HandleSignatures(ctx, batchData, sigChan) + require.NoError(t, err) + + // Test that the blob metadata status are updated + bm0, err := components.BlobMetadataStore.GetBlobMetadata(ctx, objs.blobKeys[0]) + require.NoError(t, err) + require.Equal(t, v2.Certified, bm0.BlobStatus) + bm1, err := components.BlobMetadataStore.GetBlobMetadata(ctx, objs.blobKeys[1]) + require.NoError(t, err) + require.Equal(t, v2.Certified, bm1.BlobStatus) + + // Get batch header + vis, err := components.BlobMetadataStore.GetBlobVerificationInfos(ctx, objs.blobKeys[0]) + require.NoError(t, err) + require.Len(t, vis, 1) + bhh, err = vis[0].BatchHeader.Hash() + require.NoError(t, err) + + // Test that attestation is written + att, err := components.BlobMetadataStore.GetAttestation(ctx, bhh) + require.NoError(t, err) + require.NotNil(t, att) + require.Equal(t, vis[0].BatchHeader, att.BatchHeader) + require.Greater(t, att.AttestedAt, uint64(0)) + require.Len(t, att.NonSignerPubKeys, 0) + require.NotNil(t, att.APKG2) + require.Len(t, att.QuorumAPKs, 2) + require.NotNil(t, att.Sigma) + require.ElementsMatch(t, att.QuorumNumbers, []core.QuorumID{0, 1}) +} + +func TestDispatcherNewBatch(t *testing.T) { + components := newDispatcherComponents(t) + objs := setupBlobCerts(t, components.BlobMetadataStore, 2) + require.Len(t, objs.blobHedaers, 2) + require.Len(t, objs.blobKeys, 2) + require.Len(t, objs.blobMetadatas, 2) + require.Len(t, objs.blobCerts, 2) + ctx := context.Background() + + batchData, err := components.Dispatcher.NewBatch(ctx, blockNumber) + require.NoError(t, err) + batch := batchData.Batch + bhh, keys, state := batchData.BatchHeaderHash, batchData.BlobKeys, batchData.OperatorState + require.NotNil(t, batch) + require.NotNil(t, batch.BatchHeader) + require.NotNil(t, bhh) + require.NotNil(t, keys) + require.NotNil(t, state) + require.ElementsMatch(t, keys, objs.blobKeys) + + // Test that the batch header hash is correct + hash, err := batch.BatchHeader.Hash() + require.NoError(t, err) + require.Equal(t, bhh, hash) + + // Test that the batch header is correct + require.Equal(t, blockNumber, batch.BatchHeader.ReferenceBlockNumber) + require.NotNil(t, batch.BatchHeader.BatchRoot) + + // Test that the batch header is written + bh, err := components.BlobMetadataStore.GetBatchHeader(ctx, bhh) + require.NoError(t, err) + require.NotNil(t, bh) + require.Equal(t, bh, batch.BatchHeader) + + // Test that blob verification infos are written + vi0, err := components.BlobMetadataStore.GetBlobVerificationInfo(ctx, objs.blobKeys[0], bhh) + require.NoError(t, err) + require.NotNil(t, vi0) + cert := batch.BlobCertificates[vi0.BlobIndex] + require.Equal(t, objs.blobHedaers[0], cert.BlobHeader) + require.Equal(t, objs.blobKeys[0], vi0.BlobKey) + require.Equal(t, bh, vi0.BatchHeader) + certHash, err := cert.Hash() + require.NoError(t, err) + proof, err := core.DeserializeMerkleProof(vi0.InclusionProof) + require.NoError(t, err) + verified, err := merkletree.VerifyProofUsing(certHash[:], false, proof, [][]byte{vi0.BatchRoot[:]}, keccak256.New()) + require.NoError(t, err) + require.True(t, verified) + + // Attempt to create a batch with the same blobs + _, err = components.Dispatcher.NewBatch(ctx, blockNumber) + require.ErrorContains(t, err, "no blobs to dispatch") +} + +func TestDispatcherBuildMerkleTree(t *testing.T) { + certs := []*corev2.BlobCertificate{ + { + BlobHeader: &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{0}, + BlobCommitments: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "account 1", + BinIndex: 0, + CumulativePayment: big.NewInt(532), + }, + Signature: []byte("signature"), + }, + RelayKeys: []corev2.RelayKey{0}, + }, + { + BlobHeader: &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{0, 1}, + BlobCommitments: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "account 2", + BinIndex: 0, + CumulativePayment: big.NewInt(532), + }, + Signature: []byte("signature"), + }, + RelayKeys: []corev2.RelayKey{0, 1, 2}, + }, + } + merkleTree, err := controller.BuildMerkleTree(certs) + require.NoError(t, err) + require.NotNil(t, merkleTree) + require.NotNil(t, merkleTree.Root()) + + proof, err := merkleTree.GenerateProofWithIndex(uint64(0), 0) + require.NoError(t, err) + require.NotNil(t, proof) + hash, err := certs[0].Hash() + require.NoError(t, err) + verified, err := merkletree.VerifyProofUsing(hash[:], false, proof, [][]byte{merkleTree.Root()}, keccak256.New()) + require.NoError(t, err) + require.True(t, verified) + + proof, err = merkleTree.GenerateProofWithIndex(uint64(1), 0) + require.NoError(t, err) + require.NotNil(t, proof) + hash, err = certs[1].Hash() + require.NoError(t, err) + verified, err = merkletree.VerifyProofUsing(hash[:], false, proof, [][]byte{merkleTree.Root()}, keccak256.New()) + require.NoError(t, err) + require.True(t, verified) +} + +type testObjects struct { + blobHedaers []*corev2.BlobHeader + blobKeys []corev2.BlobKey + blobMetadatas []*v2.BlobMetadata + blobCerts []*corev2.BlobCertificate +} + +func setupBlobCerts(t *testing.T, blobMetadataStore *blobstore.BlobMetadataStore, numObjects int) *testObjects { + ctx := context.Background() + headers := make([]*corev2.BlobHeader, numObjects) + keys := make([]corev2.BlobKey, numObjects) + metadatas := make([]*v2.BlobMetadata, numObjects) + certs := make([]*corev2.BlobCertificate, numObjects) + for i := 0; i < numObjects; i++ { + randomBytes := make([]byte, 16) + _, err := rand.Read(randomBytes) + require.NoError(t, err) + randomBinIndex, err := rand.Int(rand.Reader, big.NewInt(1000)) + require.NoError(t, err) + binIndex := uint32(randomBinIndex.Uint64()) + headers[i] = &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{0, 1}, + BlobCommitments: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: hex.EncodeToString(randomBytes), + BinIndex: binIndex, + CumulativePayment: big.NewInt(532), + }, + } + key, err := headers[i].BlobKey() + require.NoError(t, err) + keys[i] = key + now := time.Now() + metadatas[i] = &v2.BlobMetadata{ + BlobHeader: headers[i], + BlobStatus: v2.Encoded, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + UpdatedAt: uint64(now.UnixNano()) - uint64(i), + } + err = blobMetadataStore.PutBlobMetadata(ctx, metadatas[i]) + require.NoError(t, err) + + certs[i] = &corev2.BlobCertificate{ + BlobHeader: headers[i], + RelayKeys: []corev2.RelayKey{0, 1, 2}, + } + err = blobMetadataStore.PutBlobCertificate(ctx, certs[i], &encoding.FragmentInfo{}) + require.NoError(t, err) + } + + return &testObjects{ + blobHedaers: headers, + blobKeys: keys, + blobMetadatas: metadatas, + blobCerts: certs, + } +} + +func newDispatcherComponents(t *testing.T) *dispatcherComponents { + // logger := logging.NewNoopLogger() + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + pool := workerpool.New(5) + + chainReader := &coremock.MockWriter{} + chainReader.On("OperatorIDToAddress").Return(gethcommon.Address{0}, nil) + agg, err := core.NewStdSignatureAggregator(logger, chainReader) + require.NoError(t, err) + nodeClientManager := &controller.MockClientManager{} + mockChainState.On("GetCurrentBlockNumber").Return(uint(blockNumber), nil) + d, err := controller.NewDispatcher(controller.DispatcherConfig{ + PullInterval: 1 * time.Second, + FinalizationBlockDelay: finalizationBlockDelay, + NodeRequestTimeout: 1 * time.Second, + NumRequestRetries: 3, + NumConnectionsToNodes: 2, + }, blobMetadataStore, pool, mockChainState, agg, nodeClientManager, logger) + require.NoError(t, err) + return &dispatcherComponents{ + Dispatcher: d, + BlobMetadataStore: blobMetadataStore, + Pool: pool, + ChainReader: chainReader, + ChainState: mockChainState, + SigAggregator: agg, + NodeClientManager: nodeClientManager, + } +} diff --git a/disperser/controller/encoding_manager.go b/disperser/controller/encoding_manager.go index 7ee6c72dea..28ce060b7d 100644 --- a/disperser/controller/encoding_manager.go +++ b/disperser/controller/encoding_manager.go @@ -22,7 +22,7 @@ import ( var errNoBlobsToEncode = errors.New("no blobs to encode") type EncodingManagerConfig struct { - PullInterval time.Duration + PullInterval time.Duration EncodingRequestTimeout time.Duration StoreTimeout time.Duration diff --git a/disperser/controller/encoding_manager_test.go b/disperser/controller/encoding_manager_test.go index 725ad8d5a4..ed2e4dbcb9 100644 --- a/disperser/controller/encoding_manager_test.go +++ b/disperser/controller/encoding_manager_test.go @@ -22,7 +22,7 @@ import ( ) var ( - blockNumber = uint32(100) + blockNumber = uint64(100) ) type testComponents struct { @@ -91,7 +91,7 @@ func TestGetRelayKeys(t *testing.T) { } } -func TestHandleBatch(t *testing.T) { +func TestEncodingManagerHandleBatch(t *testing.T) { ctx := context.Background() blobHeader1 := &v2.BlobHeader{ BlobVersion: 0, @@ -141,14 +141,14 @@ func TestHandleBatch(t *testing.T) { assert.Equal(t, fetchedFragmentInfo.FragmentSizeBytes, uint32(1024*1024*4)) } -func TestHandleBatchNoBlobs(t *testing.T) { +func TestEncodingManagerHandleBatchNoBlobs(t *testing.T) { ctx := context.Background() c := newTestComponents(t) err := c.EncodingManager.HandleBatch(ctx) assert.ErrorContains(t, err, "no blobs to encode") } -func TestHandleBatchRetrySuccess(t *testing.T) { +func TestEncodingManagerHandleBatchRetrySuccess(t *testing.T) { ctx := context.Background() blobHeader1 := &v2.BlobHeader{ BlobVersion: 0, @@ -200,7 +200,7 @@ func TestHandleBatchRetrySuccess(t *testing.T) { c.EncodingClient.AssertNumberOfCalls(t, "EncodeBlob", 2) } -func TestHandleBatchRetryFailure(t *testing.T) { +func TestEncodingManagerHandleBatchRetryFailure(t *testing.T) { ctx := context.Background() blobHeader1 := &v2.BlobHeader{ BlobVersion: 0, diff --git a/disperser/controller/mock_node_client_manager.go b/disperser/controller/mock_node_client_manager.go new file mode 100644 index 0000000000..9f6821255e --- /dev/null +++ b/disperser/controller/mock_node_client_manager.go @@ -0,0 +1,19 @@ +package controller + +import ( + "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/core" + "github.com/stretchr/testify/mock" +) + +type MockClientManager struct { + mock.Mock +} + +var _ NodeClientManager = (*MockClientManager)(nil) + +func (m *MockClientManager) GetClient(operatorID core.OperatorID, socket string) (clients.NodeClientV2, error) { + args := m.Called(operatorID, socket) + client, _ := args.Get(0).(clients.NodeClientV2) + return client, args.Error(1) +} diff --git a/disperser/controller/node_client_manager.go b/disperser/controller/node_client_manager.go new file mode 100644 index 0000000000..bf51665f8e --- /dev/null +++ b/disperser/controller/node_client_manager.go @@ -0,0 +1,60 @@ +package controller + +import ( + "fmt" + + "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigensdk-go/logging" + lru "github.com/hashicorp/golang-lru/v2" +) + +type NodeClientManager interface { + GetClient(operatorID core.OperatorID, socket string) (clients.NodeClientV2, error) +} + +type nodeClientManager struct { + nodeClients *lru.Cache[core.OperatorID, clients.NodeClientV2] + logger logging.Logger +} + +var _ NodeClientManager = (*nodeClientManager)(nil) + +func NewNodeClientManager(cacheSize int, logger logging.Logger) (*nodeClientManager, error) { + closeClient := func(key core.OperatorID, value clients.NodeClientV2) { + if err := value.Close(); err != nil { + logger.Error("failed to close node client", "err", err) + } + } + nodeClients, err := lru.NewWithEvict(cacheSize, closeClient) + if err != nil { + return nil, fmt.Errorf("failed to create LRU cache: %w", err) + } + + return &nodeClientManager{ + nodeClients: nodeClients, + logger: logger, + }, nil +} + +func (m *nodeClientManager) GetClient(operatorID core.OperatorID, socket string) (clients.NodeClientV2, error) { + client, ok := m.nodeClients.Get(operatorID) + if !ok { + host, dispersalPort, _, err := core.ParseOperatorSocket(socket) + if err != nil { + return nil, fmt.Errorf("failed to parse operator socket: %w", err) + } + + client, err = clients.NewNodeClientV2(&clients.NodeClientV2Config{ + Hostname: host, + Port: dispersalPort, + }) + if err != nil { + return nil, fmt.Errorf("failed to create node client at %s: %w", socket, err) + } + + m.nodeClients.Add(operatorID, client) + } + + return client, nil +} diff --git a/disperser/controller/node_client_manager_test.go b/disperser/controller/node_client_manager_test.go new file mode 100644 index 0000000000..f54d5fe93e --- /dev/null +++ b/disperser/controller/node_client_manager_test.go @@ -0,0 +1,43 @@ +package controller_test + +import ( + "testing" + + "github.com/Layr-Labs/eigenda/disperser/controller" + "github.com/stretchr/testify/assert" +) + +func TestNodeClientManager(t *testing.T) { + m, err := controller.NewNodeClientManager(2, nil) + assert.NoError(t, err) + + _, err = m.GetClient([32]byte{0}, "localhost:0000") + assert.ErrorContains(t, err, "failed to parse operator socket") + + client0, err := m.GetClient([32]byte{0}, "localhost:0000;1111") + assert.NoError(t, err) + assert.NotNil(t, client0) + + client1, err := m.GetClient([32]byte{0}, "localhost:0000;1111") + assert.NoError(t, err) + assert.NotNil(t, client1) + + assert.Same(t, client0, client1) + + // fill up the cache + client2, err := m.GetClient([32]byte{1}, "localhost:2222;3333") + assert.NoError(t, err) + assert.NotNil(t, client2) + + // evict client0 + client3, err := m.GetClient([32]byte{2}, "localhost:2222;3333") + assert.NoError(t, err) + assert.NotNil(t, client3) + + // accessing client0 again should create new client + client4, err := m.GetClient([32]byte{0}, "localhost:0000;1111") + assert.NoError(t, err) + assert.NotNil(t, client0) + + assert.NotSame(t, client0, client4) +}