diff --git a/common/read_only_map.go b/common/read_only_map.go new file mode 100644 index 0000000000..3e6e097054 --- /dev/null +++ b/common/read_only_map.go @@ -0,0 +1,26 @@ +package common + +type ReadOnlyMap[K comparable, V any] struct { + data map[K]V +} + +func NewReadOnlyMap[K comparable, V any](data map[K]V) *ReadOnlyMap[K, V] { + return &ReadOnlyMap[K, V]{data: data} +} + +func (m *ReadOnlyMap[K, V]) Get(key K) (V, bool) { + value, ok := m.data[key] + return value, ok +} + +func (m *ReadOnlyMap[K, V]) Keys() []K { + keys := make([]K, 0, len(m.data)) + for key := range m.data { + keys = append(keys, key) + } + return keys +} + +func (m *ReadOnlyMap[K, V]) Len() int { + return len(m.data) +} diff --git a/common/read_only_map_test.go b/common/read_only_map_test.go new file mode 100644 index 0000000000..8c1716eda3 --- /dev/null +++ b/common/read_only_map_test.go @@ -0,0 +1,31 @@ +package common_test + +import ( + "testing" + + "github.com/Layr-Labs/eigenda/common" + "github.com/stretchr/testify/require" +) + +func TestReadOnlyMap(t *testing.T) { + data := map[uint8]string{ + 1: "one", + 2: "two", + 3: "three", + } + m := common.NewReadOnlyMap(data) + res, ok := m.Get(1) + require.True(t, ok) + require.Equal(t, "one", res) + res, ok = m.Get(2) + require.True(t, ok) + require.Equal(t, "two", res) + res, ok = m.Get(3) + require.True(t, ok) + require.Equal(t, "three", res) + res, ok = m.Get(4) + require.False(t, ok) + require.Equal(t, "", res) + require.Equal(t, 3, m.Len()) + require.ElementsMatch(t, []uint8{1, 2, 3}, m.Keys()) +} diff --git a/core/chainio.go b/core/chainio.go index 6d820d6295..340e9e6b18 100644 --- a/core/chainio.go +++ b/core/chainio.go @@ -100,6 +100,12 @@ type Reader interface { // GetRequiredQuorumNumbers returns set of required quorum numbers GetRequiredQuorumNumbers(ctx context.Context, blockNumber uint32) ([]QuorumID, error) + // GetVersionedBlobParams returns the blob version parameters for the given block number and blob version. + GetVersionedBlobParams(ctx context.Context, blobVersion uint8) (*BlobVersionParameters, error) + + // GetAllVersionedBlobParams returns the blob version parameters for all blob versions at the given block number. + GetAllVersionedBlobParams(ctx context.Context) (map[uint8]*BlobVersionParameters, error) + // GetActiveReservations returns active reservations (end timestamp > current timestamp) GetActiveReservations(ctx context.Context, blockNumber uint32, accountIDs []string) (map[string]ActiveReservation, error) diff --git a/core/data.go b/core/data.go index 61f6be0e77..4303056902 100644 --- a/core/data.go +++ b/core/data.go @@ -612,3 +612,9 @@ type ActiveReservation struct { type OnDemandPayment struct { CumulativePayment *big.Int // Total amount deposited by the user } + +type BlobVersionParameters struct { + CodingRate uint32 + MaxNumOperators uint32 + NumChunks uint32 +} diff --git a/core/eth/reader.go b/core/eth/reader.go index b5bafca3c6..3f47e3c3cb 100644 --- a/core/eth/reader.go +++ b/core/eth/reader.go @@ -4,6 +4,7 @@ import ( "context" "crypto/ecdsa" "math/big" + "strings" "github.com/Layr-Labs/eigenda/common" avsdir "github.com/Layr-Labs/eigenda/contracts/bindings/AVSDirectory" @@ -580,6 +581,41 @@ func (t *Reader) GetRequiredQuorumNumbers(ctx context.Context, blockNumber uint3 return requiredQuorums, nil } +func (t *Reader) GetVersionedBlobParams(ctx context.Context, blobVersion uint8) (*core.BlobVersionParameters, error) { + params, err := t.bindings.EigenDAServiceManager.GetBlobParams(&bind.CallOpts{ + Context: ctx, + }, uint16(blobVersion)) + if err != nil { + return nil, err + } + return &core.BlobVersionParameters{ + CodingRate: uint32(params.CodingRate), + NumChunks: uint32(params.NumChunks), + MaxNumOperators: uint32(params.MaxNumOperators), + }, nil +} + +func (t *Reader) GetAllVersionedBlobParams(ctx context.Context) (map[uint8]*core.BlobVersionParameters, error) { + res := make(map[uint8]*core.BlobVersionParameters) + version := uint8(0) + for { + params, err := t.GetVersionedBlobParams(ctx, version) + if err != nil && strings.Contains(err.Error(), "execution reverted") { + break + } else if err != nil { + return nil, err + } + res[version] = params + version++ + } + + if len(res) == 0 { + return nil, errors.New("no blob version parameters found") + } + + return res, nil +} + func (t *Reader) GetActiveReservations(ctx context.Context, blockNumber uint32, accountIDs []string) (map[string]core.ActiveReservation, error) { // contract is not implemented yet return map[string]core.ActiveReservation{}, nil diff --git a/core/mock/v2/validator.go b/core/mock/v2/validator.go index 591257a97c..e819c808b9 100644 --- a/core/mock/v2/validator.go +++ b/core/mock/v2/validator.go @@ -25,7 +25,7 @@ func (v *MockShardValidator) ValidateBatchHeader(ctx context.Context, header *co return args.Error(0) } -func (v *MockShardValidator) ValidateBlobs(ctx context.Context, blobs []*corev2.BlobShard, pool common.WorkerPool, state *core.OperatorState) error { +func (v *MockShardValidator) ValidateBlobs(ctx context.Context, blobs []*corev2.BlobShard, blobVersionParams *corev2.BlobVersionParameterMap, pool common.WorkerPool, state *core.OperatorState) error { args := v.Called() return args.Error(0) } diff --git a/core/mock/writer.go b/core/mock/writer.go index cd7f9a78f5..b700b8cd71 100644 --- a/core/mock/writer.go +++ b/core/mock/writer.go @@ -197,6 +197,21 @@ func (t *MockWriter) GetRequiredQuorumNumbers(ctx context.Context, blockNumber u return result.([]uint8), args.Error(1) } +func (t *MockWriter) GetVersionedBlobParams(ctx context.Context, blobVersion uint8) (*core.BlobVersionParameters, error) { + args := t.Called() + if args.Get(0) == nil { + return nil, args.Error(1) + } + result := args.Get(0) + return result.(*core.BlobVersionParameters), args.Error(1) +} + +func (t *MockWriter) GetAllVersionedBlobParams(ctx context.Context) (map[uint8]*core.BlobVersionParameters, error) { + args := t.Called() + result := args.Get(0) + return result.(map[uint8]*core.BlobVersionParameters), args.Error(1) +} + func (t *MockWriter) PubkeyHashToOperator(ctx context.Context, operatorId core.OperatorID) (gethcommon.Address, error) { args := t.Called() result := args.Get(0) diff --git a/core/v2/assignment.go b/core/v2/assignment.go index 53e6b96fb9..320406a18d 100644 --- a/core/v2/assignment.go +++ b/core/v2/assignment.go @@ -8,11 +8,9 @@ import ( "github.com/Layr-Labs/eigenda/core" ) -func GetAssignments(state *core.OperatorState, blobVersion BlobVersion, quorum uint8) (map[core.OperatorID]Assignment, error) { - - params, ok := ParametersMap[blobVersion] - if !ok { - return nil, fmt.Errorf("blob version %d not found", blobVersion) +func GetAssignments(state *core.OperatorState, blobParams *core.BlobVersionParameters, quorum uint8) (map[core.OperatorID]Assignment, error) { + if blobParams == nil { + return nil, fmt.Errorf("blob params cannot be nil") } ops, ok := state.Operators[quorum] @@ -20,12 +18,12 @@ func GetAssignments(state *core.OperatorState, blobVersion BlobVersion, quorum u return nil, fmt.Errorf("no operators found for quorum %d", quorum) } - if len(ops) > int(params.MaxNumOperators()) { - return nil, fmt.Errorf("too many operators for blob version %d", blobVersion) + if uint32(len(ops)) > blobParams.MaxNumOperators { + return nil, fmt.Errorf("too many operators (%d) to get assignments: max number of operators is %d", len(ops), blobParams.MaxNumOperators) } numOperators := big.NewInt(int64(len(ops))) - numChunks := big.NewInt(int64(params.NumChunks)) + numChunks := big.NewInt(int64(blobParams.NumChunks)) type assignment struct { id core.OperatorID @@ -58,9 +56,9 @@ func GetAssignments(state *core.OperatorState, blobVersion BlobVersion, quorum u mp += int(a.chunks) } - delta := int(params.NumChunks) - mp + delta := int(blobParams.NumChunks) - mp if delta < 0 { - return nil, fmt.Errorf("total chunks %d exceeds maximum %d", mp, params.NumChunks) + return nil, fmt.Errorf("total chunks %d exceeds maximum %d", mp, blobParams.NumChunks) } assignments := make(map[core.OperatorID]Assignment, len(chunkAssignments)) @@ -81,9 +79,11 @@ func GetAssignments(state *core.OperatorState, blobVersion BlobVersion, quorum u } -func GetAssignment(state *core.OperatorState, blobVersion BlobVersion, quorum core.QuorumID, id core.OperatorID) (Assignment, error) { - - assignments, err := GetAssignments(state, blobVersion, quorum) +func GetAssignment(state *core.OperatorState, blobParams *core.BlobVersionParameters, quorum core.QuorumID, id core.OperatorID) (Assignment, error) { + if blobParams == nil { + return Assignment{}, fmt.Errorf("blob params cannot be nil") + } + assignments, err := GetAssignments(state, blobParams, quorum) if err != nil { return Assignment{}, err } @@ -96,22 +96,21 @@ func GetAssignment(state *core.OperatorState, blobVersion BlobVersion, quorum co return assignment, nil } -func GetChunkLength(blobVersion BlobVersion, blobLength uint32) (uint32, error) { - +func GetChunkLength(blobLength uint32, blobParams *core.BlobVersionParameters) (uint32, error) { if blobLength == 0 { return 0, fmt.Errorf("blob length must be greater than 0") } + if blobParams == nil { + return 0, fmt.Errorf("blob params cannot be nil") + } + // Check that the blob length is a power of 2 if blobLength&(blobLength-1) != 0 { return 0, fmt.Errorf("blob length %d is not a power of 2", blobLength) } - if _, ok := ParametersMap[blobVersion]; !ok { - return 0, fmt.Errorf("blob version %d not found", blobVersion) - } - - chunkLength := blobLength * ParametersMap[blobVersion].CodingRate / ParametersMap[blobVersion].NumChunks + chunkLength := blobLength * blobParams.CodingRate / blobParams.NumChunks if chunkLength == 0 { chunkLength = 1 } diff --git a/core/v2/assignment_test.go b/core/v2/assignment_test.go index d8b5d2da81..5062f219ad 100644 --- a/core/v2/assignment_test.go +++ b/core/v2/assignment_test.go @@ -11,18 +11,12 @@ import ( "github.com/stretchr/testify/assert" ) -const ( - maxNumOperators = 3537 -) - func TestOperatorAssignmentsV2(t *testing.T) { state := dat.GetTotalOperatorState(context.Background(), 0) operatorState := state.OperatorState - blobVersion := corev2.BlobVersion(0) - - assignments, err := corev2.GetAssignments(operatorState, blobVersion, 0) + assignments, err := corev2.GetAssignments(operatorState, blobParams, 0) assert.NoError(t, err) expectedAssignments := map[core.OperatorID]corev2.Assignment{ mock.MakeOperatorId(0): { @@ -55,7 +49,7 @@ func TestOperatorAssignmentsV2(t *testing.T) { assert.Equal(t, assignment, expectedAssignments[operatorID]) - assignment, err := corev2.GetAssignment(operatorState, blobVersion, 0, operatorID) + assignment, err := corev2.GetAssignment(operatorState, blobParams, 0, operatorID) assert.NoError(t, err) assert.Equal(t, assignment, expectedAssignments[operatorID]) @@ -64,20 +58,14 @@ func TestOperatorAssignmentsV2(t *testing.T) { } -func TestMaxNumOperators(t *testing.T) { - - assert.Equal(t, corev2.ParametersMap[0].MaxNumOperators(), uint32(maxNumOperators)) - -} - func TestAssignmentWithTooManyOperators(t *testing.T) { - numOperators := maxNumOperators + 1 + numOperators := blobParams.MaxNumOperators + 1 stakes := map[core.QuorumID]map[core.OperatorID]int{ 0: {}, } - for i := 0; i < numOperators; i++ { + for i := 0; i < int(numOperators); i++ { stakes[0][mock.MakeOperatorId(i)] = rand.Intn(100) + 1 } @@ -88,11 +76,9 @@ func TestAssignmentWithTooManyOperators(t *testing.T) { state := dat.GetTotalOperatorState(context.Background(), 0) - assert.Equal(t, len(state.Operators[0]), numOperators) - - blobVersion := corev2.BlobVersion(0) + assert.Equal(t, len(state.Operators[0]), int(numOperators)) - _, err = corev2.GetAssignments(state.OperatorState, blobVersion, 0) + _, err = corev2.GetAssignments(state.OperatorState, blobParams, 0) assert.Error(t, err) } @@ -110,7 +96,7 @@ func FuzzOperatorAssignmentsV2(f *testing.F) { } for i := 0; i < 5; i++ { - f.Add(maxNumOperators) + f.Add(int(blobParams.MaxNumOperators)) } f.Fuzz(func(t *testing.T, numOperators int) { @@ -131,9 +117,7 @@ func FuzzOperatorAssignmentsV2(f *testing.F) { state := dat.GetTotalOperatorState(context.Background(), 0) - blobVersion := corev2.BlobVersion(0) - - assignments, err := corev2.GetAssignments(state.OperatorState, blobVersion, 0) + assignments, err := corev2.GetAssignments(state.OperatorState, blobParams, 0) assert.NoError(t, err) // Check that the total number of chunks is correct @@ -141,7 +125,7 @@ func FuzzOperatorAssignmentsV2(f *testing.F) { for _, assignment := range assignments { totalChunks += assignment.NumChunks } - assert.Equal(t, totalChunks, corev2.ParametersMap[blobVersion].NumChunks) + assert.Equal(t, totalChunks, blobParams.NumChunks) // Check that each operator's assignment satisfies the security requirement for operatorID, assignment := range assignments { @@ -149,21 +133,16 @@ func FuzzOperatorAssignmentsV2(f *testing.F) { totalStake := uint32(state.Totals[0].Stake.Uint64()) myStake := uint32(state.Operators[0][operatorID].Stake.Uint64()) - LHS := assignment.NumChunks * totalStake * corev2.ParametersMap[blobVersion].CodingRate * uint32(corev2.ParametersMap[blobVersion].ReconstructionThreshold*100) - RHS := 100 * myStake * corev2.ParametersMap[blobVersion].NumChunks + reconstructionThreshold := 0.22 + LHS := assignment.NumChunks * totalStake * blobParams.CodingRate * uint32(reconstructionThreshold*100) + RHS := 100 * myStake * blobParams.NumChunks assert.GreaterOrEqual(t, LHS, RHS) - } - }) - } func TestChunkLength(t *testing.T) { - - blobVersion := corev2.BlobVersion(0) - pairs := []struct { blobLength uint32 chunkLength uint32 @@ -176,20 +155,13 @@ func TestChunkLength(t *testing.T) { } for _, pair := range pairs { - - chunkLength, err := corev2.GetChunkLength(blobVersion, pair.blobLength) - + chunkLength, err := corev2.GetChunkLength(pair.blobLength, blobParams) assert.NoError(t, err) - assert.Equal(t, pair.chunkLength, chunkLength) } - } func TestInvalidChunkLength(t *testing.T) { - - blobVersion := corev2.BlobVersion(0) - invalidLengths := []uint32{ 0, 3, @@ -212,9 +184,7 @@ func TestInvalidChunkLength(t *testing.T) { } for _, length := range invalidLengths { - - _, err := corev2.GetChunkLength(blobVersion, length) + _, err := corev2.GetChunkLength(length, blobParams) assert.Error(t, err) } - } diff --git a/core/v2/blob_params.go b/core/v2/blob_params.go new file mode 100644 index 0000000000..5b16a61295 --- /dev/null +++ b/core/v2/blob_params.go @@ -0,0 +1,12 @@ +package v2 + +import ( + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core" +) + +type BlobVersionParameterMap = common.ReadOnlyMap[BlobVersion, *core.BlobVersionParameters] + +func NewBlobVersionParameterMap(params map[BlobVersion]*core.BlobVersionParameters) *BlobVersionParameterMap { + return common.NewReadOnlyMap(params) +} diff --git a/core/v2/core_test.go b/core/v2/core_test.go index 54f4e02e45..ebc6eadb77 100644 --- a/core/v2/core_test.go +++ b/core/v2/core_test.go @@ -12,6 +12,7 @@ import ( "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/mock" corev2 "github.com/Layr-Labs/eigenda/core/v2" + v2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/encoding/kzg" "github.com/Layr-Labs/eigenda/encoding/kzg/prover" @@ -32,6 +33,15 @@ var ( v encoding.Verifier GETTYSBURG_ADDRESS_BYTES = []byte("Fourscore and seven years ago our fathers brought forth, on this continent, a new nation, conceived in liberty, and dedicated to the proposition that all men are created equal. Now we are engaged in a great civil war, testing whether that nation, or any nation so conceived, and so dedicated, can long endure. We are met on a great battle-field of that war. We have come to dedicate a portion of that field, as a final resting-place for those who here gave their lives, that that nation might live. It is altogether fitting and proper that we should do this. But, in a larger sense, we cannot dedicate, we cannot consecrate—we cannot hallow—this ground. The brave men, living and dead, who struggled here, have consecrated it far above our poor power to add or detract. The world will little note, nor long remember what we say here, but it can never forget what they did here. It is for us the living, rather, to be dedicated here to the unfinished work which they who fought here have thus far so nobly advanced. It is rather for us to be here dedicated to the great task remaining before us—that from these honored dead we take increased devotion to that cause for which they here gave the last full measure of devotion—that we here highly resolve that these dead shall not have died in vain—that this nation, under God, shall have a new birth of freedom, and that government of the people, by the people, for the people, shall not perish from the earth.") + + blobParams = &core.BlobVersionParameters{ + NumChunks: 8192, + CodingRate: 8, + MaxNumOperators: 3537, + } + blobParamsMap = v2.NewBlobVersionParameterMap(map[corev2.BlobVersion]*core.BlobVersionParameters{ + 0: blobParams, + }) ) func TestMain(m *testing.M) { @@ -134,7 +144,7 @@ func prepareBlobs( blob := blobs[z] header := cert.BlobHeader - params, err := header.GetEncodingParams() + params, err := header.GetEncodingParams(blobParams) if err != nil { t.Fatal(err) } @@ -153,7 +163,7 @@ func prepareBlobs( for _, quorum := range header.QuorumNumbers { - assignments, err := corev2.GetAssignments(state, header.BlobVersion, quorum) + assignments, err := corev2.GetAssignments(state, blobParams, quorum) if err != nil { t.Fatal(err) } @@ -216,11 +226,11 @@ func checkBatchByUniversalVerifier( for id := range state.IndexedOperators { - val := corev2.NewShardValidator(v, id) + val := corev2.NewShardValidator(v, id, logging.NewNoopLogger()) blobs := packagedBlobs[id] - err := val.ValidateBlobs(ctx, blobs, pool, state.OperatorState) + err := val.ValidateBlobs(ctx, blobs, blobParamsMap, pool, state.OperatorState) if err != nil { errList = multierror.Append(errList, err) } diff --git a/core/v2/types.go b/core/v2/types.go index aeb3c34fc5..796ab06b51 100644 --- a/core/v2/types.go +++ b/core/v2/types.go @@ -4,7 +4,6 @@ import ( "encoding/hex" "errors" "fmt" - "math" "math/big" "strings" @@ -16,15 +15,7 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" ) -var ( - // TODO(mooselumph): Put these parameters on chain and add on-chain checks to ensure that the number of operators does not - // conflict with the existing on-chain limits - ParametersMap = map[BlobVersion]BlobVersionParameters{ - 0: {CodingRate: 8, ReconstructionThreshold: 0.22, NumChunks: 8192}, - } -) - -type BlobVersion uint8 +type BlobVersion = uint8 // Assignment contains information about the set of chunks that a specific node will receive type Assignment struct { @@ -158,16 +149,14 @@ func (b *BlobHeader) ToProtobuf() (*commonpb.BlobHeader, error) { }, nil } -func (b *BlobHeader) GetEncodingParams() (encoding.EncodingParams, error) { - params := ParametersMap[b.BlobVersion] - - length, err := GetChunkLength(b.BlobVersion, uint32(b.BlobCommitments.Length)) +func (b *BlobHeader) GetEncodingParams(blobParams *core.BlobVersionParameters) (encoding.EncodingParams, error) { + length, err := GetChunkLength(uint32(b.BlobCommitments.Length), blobParams) if err != nil { return encoding.EncodingParams{}, err } return encoding.EncodingParams{ - NumChunks: uint64(params.NumChunks), + NumChunks: uint64(blobParams.NumChunks), ChunkLength: uint64(length), }, nil } @@ -380,16 +369,6 @@ func (v *BlobVerificationInfo) ToProtobuf(blobCert *BlobCertificate) (*disperser }, nil } -type BlobVersionParameters struct { - CodingRate uint32 - ReconstructionThreshold float64 - NumChunks uint32 -} - -func (p BlobVersionParameters) MaxNumOperators() uint32 { - return uint32(math.Floor(float64(p.NumChunks) * (1 - 1/(p.ReconstructionThreshold*float64(p.CodingRate))))) -} - // DispersalRequest is a request to disperse a batch to a specific operator type DispersalRequest struct { core.OperatorID `dynamodbav:"-"` diff --git a/core/v2/validator.go b/core/v2/validator.go index cf16d9f41c..97c42e767e 100644 --- a/core/v2/validator.go +++ b/core/v2/validator.go @@ -9,6 +9,7 @@ import ( "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigensdk-go/logging" ) var ( @@ -18,7 +19,7 @@ var ( type ShardValidator interface { ValidateBatchHeader(ctx context.Context, header *BatchHeader, blobCerts []*BlobCertificate) error - ValidateBlobs(ctx context.Context, blobs []*BlobShard, pool common.WorkerPool, state *core.OperatorState) error + ValidateBlobs(ctx context.Context, blobs []*BlobShard, blobVersionParams *BlobVersionParameterMap, pool common.WorkerPool, state *core.OperatorState) error } type BlobShard struct { @@ -30,18 +31,20 @@ type BlobShard struct { type shardValidator struct { verifier encoding.Verifier operatorID core.OperatorID + logger logging.Logger } var _ ShardValidator = (*shardValidator)(nil) -func NewShardValidator(v encoding.Verifier, operatorID core.OperatorID) *shardValidator { +func NewShardValidator(v encoding.Verifier, operatorID core.OperatorID, logger logging.Logger) *shardValidator { return &shardValidator{ verifier: v, operatorID: operatorID, + logger: logger, } } -func (v *shardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShard, operatorState *core.OperatorState) ([]*encoding.Frame, *Assignment, error) { +func (v *shardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShard, blobParams *core.BlobVersionParameters, operatorState *core.OperatorState) ([]*encoding.Frame, *Assignment, error) { // Check if the operator is a member of the quorum if _, ok := operatorState.Operators[quorum]; !ok { @@ -49,7 +52,7 @@ func (v *shardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar } // Get the assignments for the quorum - assignment, err := GetAssignment(operatorState, blob.BlobHeader.BlobVersion, quorum, v.operatorID) + assignment, err := GetAssignment(operatorState, blobParams, quorum, v.operatorID) if err != nil { return nil, nil, err } @@ -63,7 +66,7 @@ func (v *shardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar } // Get the chunk length - chunkLength, err := GetChunkLength(blob.BlobHeader.BlobVersion, uint32(blob.BlobHeader.BlobCommitments.Length)) + chunkLength, err := GetChunkLength(uint32(blob.BlobHeader.BlobCommitments.Length), blobParams) if err != nil { return nil, nil, fmt.Errorf("invalid chunk length: %w", err) } @@ -99,7 +102,15 @@ func (v *shardValidator) ValidateBatchHeader(ctx context.Context, header *BatchH return nil } -func (v *shardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, pool common.WorkerPool, state *core.OperatorState) error { +func (v *shardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, blobVersionParams *BlobVersionParameterMap, pool common.WorkerPool, state *core.OperatorState) error { + if len(blobs) == 0 { + return fmt.Errorf("no blobs") + } + + if blobVersionParams == nil { + return fmt.Errorf("blob version params is nil") + } + var err error subBatchMap := make(map[encoding.EncodingParams]*encoding.SubBatch) blobCommitmentList := make([]encoding.BlobCommitments, len(blobs)) @@ -114,49 +125,55 @@ func (v *shardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, // for each quorum for _, quorum := range blob.BlobHeader.QuorumNumbers { - chunks, assignment, err := v.validateBlobQuorum(quorum, blob, state) - if err != nil { + blobParams, ok := blobVersionParams.Get(blob.BlobHeader.BlobVersion) + if !ok { + return fmt.Errorf("blob version %d not found", blob.BlobHeader.BlobVersion) + } + chunks, assignment, err := v.validateBlobQuorum(quorum, blob, blobParams, state) + if errors.Is(err, ErrBlobQuorumSkip) { + v.logger.Warn("Skipping blob for quorum", "quorum", quorum, "err", err) + continue + } else if err != nil { return err } + // TODO: Define params for the blob - params, err := blob.BlobHeader.GetEncodingParams() + params, err := blob.BlobHeader.GetEncodingParams(blobParams) if err != nil { return err } - if errors.Is(err, ErrBlobQuorumSkip) { - continue - } else if err != nil { + if err != nil { return err - } else { - // Check the received chunks against the commitment - blobIndex := 0 - subBatch, ok := subBatchMap[params] - if ok { - blobIndex = subBatch.NumBlobs - } + } + + // Check the received chunks against the commitment + blobIndex := 0 + subBatch, ok := subBatchMap[params] + if ok { + blobIndex = subBatch.NumBlobs + } - indices := assignment.GetIndices() - samples := make([]encoding.Sample, len(chunks)) - for ind := range chunks { - samples[ind] = encoding.Sample{ - Commitment: blob.BlobHeader.BlobCommitments.Commitment, - Chunk: chunks[ind], - AssignmentIndex: uint(indices[ind]), - BlobIndex: blobIndex, - } + indices := assignment.GetIndices() + samples := make([]encoding.Sample, len(chunks)) + for ind := range chunks { + samples[ind] = encoding.Sample{ + Commitment: blob.BlobHeader.BlobCommitments.Commitment, + Chunk: chunks[ind], + AssignmentIndex: uint(indices[ind]), + BlobIndex: blobIndex, } + } - // update subBatch - if !ok { - subBatchMap[params] = &encoding.SubBatch{ - Samples: samples, - NumBlobs: 1, - } - } else { - subBatch.Samples = append(subBatch.Samples, samples...) - subBatch.NumBlobs += 1 + // update subBatch + if !ok { + subBatchMap[params] = &encoding.SubBatch{ + Samples: samples, + NumBlobs: 1, } + } else { + subBatch.Samples = append(subBatch.Samples, samples...) + subBatch.NumBlobs += 1 } } } diff --git a/disperser/apiserver/disperse_blob_v2.go b/disperser/apiserver/disperse_blob_v2.go index 5fc160b677..05034122cf 100644 --- a/disperser/apiserver/disperse_blob_v2.go +++ b/disperser/apiserver/disperse_blob_v2.go @@ -14,7 +14,12 @@ import ( ) func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBlobRequest) (*pb.DisperseBlobReply, error) { - if err := s.validateDispersalRequest(req); err != nil { + onchainState := s.onchainState.Load() + if onchainState == nil { + return nil, api.NewErrorInternal("onchain state is nil") + } + + if err := s.validateDispersalRequest(req, onchainState); err != nil { return nil, err } @@ -27,9 +32,9 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl // TODO(ian-shim): handle payments and check rate limits - blobKey, err := s.StoreBlob(ctx, data, blobHeader, time.Now()) + blobKey, err := s.StoreBlob(ctx, data, blobHeader, time.Now(), onchainState.TTL) if err != nil { - return nil, api.NewErrorInternal(err.Error()) + return nil, err } return &pb.DisperseBlobReply{ @@ -38,20 +43,20 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl }, nil } -func (s *DispersalServerV2) StoreBlob(ctx context.Context, data []byte, blobHeader *corev2.BlobHeader, requestedAt time.Time) (corev2.BlobKey, error) { +func (s *DispersalServerV2) StoreBlob(ctx context.Context, data []byte, blobHeader *corev2.BlobHeader, requestedAt time.Time, ttl time.Duration) (corev2.BlobKey, error) { blobKey, err := blobHeader.BlobKey() if err != nil { - return corev2.BlobKey{}, err + return corev2.BlobKey{}, api.NewErrorInvalidArg(fmt.Sprintf("failed to get blob key: %v", err)) } if err := s.blobStore.StoreBlob(ctx, blobKey, data); err != nil { - return corev2.BlobKey{}, err + return corev2.BlobKey{}, api.NewErrorInternal(fmt.Sprintf("failed to store blob: %v", err)) } blobMetadata := &dispv2.BlobMetadata{ BlobHeader: blobHeader, BlobStatus: dispv2.Queued, - Expiry: uint64(requestedAt.Add(s.onchainState.TTL).Unix()), + Expiry: uint64(requestedAt.Add(ttl).Unix()), NumRetries: 0, BlobSize: uint64(len(data)), RequestedAt: uint64(requestedAt.UnixNano()), @@ -61,7 +66,7 @@ func (s *DispersalServerV2) StoreBlob(ctx context.Context, data []byte, blobHead return blobKey, err } -func (s *DispersalServerV2) validateDispersalRequest(req *pb.DisperseBlobRequest) error { +func (s *DispersalServerV2) validateDispersalRequest(req *pb.DisperseBlobRequest, onchainState *OnchainState) error { data := req.GetData() blobSize := len(data) if blobSize == 0 { @@ -81,13 +86,13 @@ func (s *DispersalServerV2) validateDispersalRequest(req *pb.DisperseBlobRequest return api.NewErrorInvalidArg("blob header must contain at least one quorum number") } - if len(blobHeaderProto.GetQuorumNumbers()) > int(s.onchainState.QuorumCount) { - return api.NewErrorInvalidArg(fmt.Sprintf("too many quorum numbers specified: maximum is %d", s.onchainState.QuorumCount)) + if len(blobHeaderProto.GetQuorumNumbers()) > int(onchainState.QuorumCount) { + return api.NewErrorInvalidArg(fmt.Sprintf("too many quorum numbers specified: maximum is %d", onchainState.QuorumCount)) } for _, quorum := range blobHeaderProto.GetQuorumNumbers() { - if quorum > corev2.MaxQuorumID || uint8(quorum) >= s.onchainState.QuorumCount { - return api.NewErrorInvalidArg(fmt.Sprintf("invalid quorum number %d; maximum is %d", quorum, s.onchainState.QuorumCount)) + if quorum > corev2.MaxQuorumID || uint8(quorum) >= onchainState.QuorumCount { + return api.NewErrorInvalidArg(fmt.Sprintf("invalid quorum number %d; maximum is %d", quorum, onchainState.QuorumCount)) } } @@ -98,12 +103,8 @@ func (s *DispersalServerV2) validateDispersalRequest(req *pb.DisperseBlobRequest return api.NewErrorInvalidArg("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617") } - if _, ok := s.onchainState.BlobVersionParameters[corev2.BlobVersion(blobHeaderProto.GetVersion())]; !ok { - validVersions := make([]int32, 0, len(s.onchainState.BlobVersionParameters)) - for version := range s.onchainState.BlobVersionParameters { - validVersions = append(validVersions, int32(version)) - } - return api.NewErrorInvalidArg(fmt.Sprintf("invalid blob version %d; valid blob versions are: %v", blobHeaderProto.GetVersion(), validVersions)) + if _, ok := onchainState.BlobVersionParameters.Get(corev2.BlobVersion(blobHeaderProto.GetVersion())); !ok { + return api.NewErrorInvalidArg(fmt.Sprintf("invalid blob version %d; valid blob versions are: %v", blobHeaderProto.GetVersion(), onchainState.BlobVersionParameters.Keys())) } blobHeader, err := corev2.BlobHeaderFromProtobuf(blobHeaderProto) diff --git a/disperser/apiserver/server_v2.go b/disperser/apiserver/server_v2.go index e3cbf733f2..acf85b0542 100644 --- a/disperser/apiserver/server_v2.go +++ b/disperser/apiserver/server_v2.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net" + "sync/atomic" "time" "github.com/Layr-Labs/eigenda/api" @@ -14,6 +15,7 @@ import ( healthcheck "github.com/Layr-Labs/eigenda/common/healthcheck" "github.com/Layr-Labs/eigenda/core" corev2 "github.com/Layr-Labs/eigenda/core/v2" + v2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigenda/encoding" @@ -25,7 +27,7 @@ import ( type OnchainState struct { QuorumCount uint8 RequiredQuorums []core.QuorumID - BlobVersionParameters map[corev2.BlobVersion]corev2.BlobVersionParameters + BlobVersionParameters *corev2.BlobVersionParameterMap TTL time.Duration } @@ -42,7 +44,7 @@ type DispersalServerV2 struct { logger logging.Logger // state - onchainState OnchainState + onchainState atomic.Pointer[OnchainState] maxNumSymbolsPerBlob uint64 onchainStateRefreshInterval time.Duration } @@ -73,7 +75,6 @@ func NewDispersalServerV2( prover: prover, logger: logger, - onchainState: OnchainState{}, maxNumSymbolsPerBlob: maxNumSymbolsPerBlob, onchainStateRefreshInterval: onchainStateRefreshInterval, } @@ -190,12 +191,19 @@ func (s *DispersalServerV2) RefreshOnchainState(ctx context.Context) error { if err != nil || storeDurationBlocks == 0 { return fmt.Errorf("failed to get STORE_DURATION_BLOCKS: %w", err) } - s.onchainState = OnchainState{ - QuorumCount: quorumCount, - RequiredQuorums: requiredQuorums, - // TODO(ian-shim): this should be fetched from chain - BlobVersionParameters: corev2.ParametersMap, + + blobParams, err := s.chainReader.GetAllVersionedBlobParams(ctx) + if err != nil { + return fmt.Errorf("failed to get blob version parameters: %w", err) + } + onchainState := &OnchainState{ + QuorumCount: quorumCount, + RequiredQuorums: requiredQuorums, + BlobVersionParameters: v2.NewBlobVersionParameterMap(blobParams), TTL: time.Duration((storeDurationBlocks+blockStaleMeasure)*12) * time.Second, } + + s.onchainState.Store(onchainState) + return nil } diff --git a/disperser/apiserver/server_v2_test.go b/disperser/apiserver/server_v2_test.go index 5ef2fe8dac..77afef0085 100644 --- a/disperser/apiserver/server_v2_test.go +++ b/disperser/apiserver/server_v2_test.go @@ -16,6 +16,7 @@ import ( auth "github.com/Layr-Labs/eigenda/core/auth/v2" "github.com/Layr-Labs/eigenda/core/mock" corev2 "github.com/Layr-Labs/eigenda/core/v2" + v2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/disperser/apiserver" dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" @@ -435,6 +436,13 @@ func newTestServerV2(t *testing.T) *testComponents { chainReader.On("GetRequiredQuorumNumbers", tmock.Anything).Return([]uint8{0, 1}, nil) chainReader.On("GetBlockStaleMeasure", tmock.Anything).Return(uint32(10), nil) chainReader.On("GetStoreDurationBlocks", tmock.Anything).Return(uint32(100), nil) + chainReader.On("GetAllVersionedBlobParams", tmock.Anything).Return(map[v2.BlobVersion]*core.BlobVersionParameters{ + 0: { + NumChunks: 8192, + CodingRate: 8, + MaxNumOperators: 3537, + }, + }, nil) s := apiserver.NewDispersalServerV2(disperser.ServerConfig{ GrpcPort: "51002", diff --git a/disperser/cmd/controller/config.go b/disperser/cmd/controller/config.go index 3f146631b5..db1a94629c 100644 --- a/disperser/cmd/controller/config.go +++ b/disperser/cmd/controller/config.go @@ -63,14 +63,15 @@ func NewConfig(ctx *cli.Context) (Config, error) { AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix), LoggerConfig: *loggerConfig, EncodingManagerConfig: controller.EncodingManagerConfig{ - PullInterval: ctx.GlobalDuration(flags.EncodingPullIntervalFlag.Name), - EncodingRequestTimeout: ctx.GlobalDuration(flags.EncodingRequestTimeoutFlag.Name), - StoreTimeout: ctx.GlobalDuration(flags.EncodingStoreTimeoutFlag.Name), - NumEncodingRetries: ctx.GlobalInt(flags.NumEncodingRetriesFlag.Name), - NumRelayAssignment: uint16(numRelayAssignments), - AvailableRelays: relays, - EncoderAddress: ctx.GlobalString(flags.EncoderAddressFlag.Name), - MaxNumBlobsPerIteration: int32(ctx.GlobalInt(flags.MaxNumBlobsPerIterationFlag.Name)), + PullInterval: ctx.GlobalDuration(flags.EncodingPullIntervalFlag.Name), + EncodingRequestTimeout: ctx.GlobalDuration(flags.EncodingRequestTimeoutFlag.Name), + StoreTimeout: ctx.GlobalDuration(flags.EncodingStoreTimeoutFlag.Name), + NumEncodingRetries: ctx.GlobalInt(flags.NumEncodingRetriesFlag.Name), + NumRelayAssignment: uint16(numRelayAssignments), + AvailableRelays: relays, + EncoderAddress: ctx.GlobalString(flags.EncoderAddressFlag.Name), + MaxNumBlobsPerIteration: int32(ctx.GlobalInt(flags.MaxNumBlobsPerIterationFlag.Name)), + OnchainStateRefreshInterval: ctx.GlobalDuration(flags.OnchainStateRefreshIntervalFlag.Name), }, DispatcherConfig: controller.DispatcherConfig{ PullInterval: ctx.GlobalDuration(flags.DispatcherPullIntervalFlag.Name), diff --git a/disperser/cmd/controller/flags/flags.go b/disperser/cmd/controller/flags/flags.go index 71654cc4d3..7a54153aa6 100644 --- a/disperser/cmd/controller/flags/flags.go +++ b/disperser/cmd/controller/flags/flags.go @@ -109,6 +109,13 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_NUM_BLOBS_PER_ITERATION"), Value: 128, } + OnchainStateRefreshIntervalFlag = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "onchain-state-refresh-interval"), + Usage: "Interval at which to refresh the onchain state", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "ONCHAIN_STATE_REFRESH_INTERVAL"), + Value: 1 * time.Hour, + } // Dispatcher Flags DispatcherPullIntervalFlag = cli.DurationFlag{ @@ -188,6 +195,7 @@ var optionalFlags = []cli.Flag{ NumRelayAssignmentFlag, NumConcurrentEncodingRequestsFlag, MaxNumBlobsPerIterationFlag, + OnchainStateRefreshIntervalFlag, FinalizationBlockDelayFlag, NumRequestRetriesFlag, diff --git a/disperser/controller/encoding_manager.go b/disperser/controller/encoding_manager.go index 551f6af14f..a6e015051b 100644 --- a/disperser/controller/encoding_manager.go +++ b/disperser/controller/encoding_manager.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "math/rand" + "sync/atomic" "time" "github.com/Layr-Labs/eigenda/common" @@ -36,6 +37,8 @@ type EncodingManagerConfig struct { EncoderAddress string // MaxNumBlobsPerIteration is the maximum number of blobs to encode per iteration MaxNumBlobsPerIteration int32 + // OnchainStateRefreshInterval is the interval at which the onchain state is refreshed + OnchainStateRefreshInterval time.Duration } // EncodingManager is responsible for pulling queued blobs from the blob @@ -52,7 +55,8 @@ type EncodingManager struct { logger logging.Logger // state - cursor *blobstore.StatusIndexCursor + cursor *blobstore.StatusIndexCursor + blobVersionParameters atomic.Pointer[corev2.BlobVersionParameterMap] } func NewEncodingManager( @@ -84,6 +88,30 @@ func NewEncodingManager( } func (e *EncodingManager) Start(ctx context.Context) error { + // Refresh blob version parameters + err := e.refreshBlobVersionParams(ctx) + if err != nil { + return fmt.Errorf("failed to refresh blob version parameters: %w", err) + } + + go func() { + ticker := time.NewTicker(e.EncodingManagerConfig.OnchainStateRefreshInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + e.logger.Info("refreshing blob version params") + if err := e.refreshBlobVersionParams(ctx); err != nil { + e.logger.Error("failed to refresh blob version params", "err", err) + } + case <-ctx.Done(): + return + } + } + }() + + // Start the encoding loop go func() { ticker := time.NewTicker(e.PullInterval) defer ticker.Stop() @@ -118,6 +146,11 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error { return errNoBlobsToEncode } + blobVersionParams := e.blobVersionParameters.Load() + if blobVersionParams == nil { + return fmt.Errorf("blob version parameters is nil") + } + for _, blob := range blobMetadatas { blob := blob blobKey, err := blob.BlobHeader.BlobKey() @@ -126,11 +159,17 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error { continue } + blobParams, ok := blobVersionParams.Get(blob.BlobHeader.BlobVersion) + if !ok { + e.logger.Error("failed to get blob version parameters", "version", blob.BlobHeader.BlobVersion) + continue + } + // Encode the blobs e.pool.Submit(func() { for i := 0; i < e.NumEncodingRetries+1; i++ { encodingCtx, cancel := context.WithTimeout(ctx, e.EncodingRequestTimeout) - fragmentInfo, err := e.encodeBlob(encodingCtx, blobKey, blob) + fragmentInfo, err := e.encodeBlob(encodingCtx, blobKey, blob, blobParams) cancel() if err != nil { e.logger.Error("failed to encode blob", "blobKey", blobKey.Hex(), "err", err) @@ -183,14 +222,25 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error { return nil } -func (e *EncodingManager) encodeBlob(ctx context.Context, blobKey corev2.BlobKey, blob *v2.BlobMetadata) (*encoding.FragmentInfo, error) { - encodingParams, err := blob.BlobHeader.GetEncodingParams() +func (e *EncodingManager) encodeBlob(ctx context.Context, blobKey corev2.BlobKey, blob *v2.BlobMetadata, blobParams *core.BlobVersionParameters) (*encoding.FragmentInfo, error) { + encodingParams, err := blob.BlobHeader.GetEncodingParams(blobParams) if err != nil { return nil, fmt.Errorf("failed to get encoding params: %w", err) } return e.encodingClient.EncodeBlob(ctx, blobKey, encodingParams) } +func (e *EncodingManager) refreshBlobVersionParams(ctx context.Context) error { + e.logger.Debug("Refreshing blob version params") + blobParams, err := e.chainReader.GetAllVersionedBlobParams(ctx) + if err != nil { + return fmt.Errorf("failed to get blob version parameters: %w", err) + } + + e.blobVersionParameters.Store(corev2.NewBlobVersionParameterMap(blobParams)) + return nil +} + func GetRelayKeys(numAssignment uint16, availableRelays []corev2.RelayKey) ([]corev2.RelayKey, error) { if int(numAssignment) > len(availableRelays) { return nil, fmt.Errorf("numAssignment (%d) cannot be greater than numRelays (%d)", numAssignment, len(availableRelays)) diff --git a/disperser/controller/encoding_manager_test.go b/disperser/controller/encoding_manager_test.go index 4724cf410c..7ab6b25730 100644 --- a/disperser/controller/encoding_manager_test.go +++ b/disperser/controller/encoding_manager_test.go @@ -7,8 +7,10 @@ import ( "github.com/Layr-Labs/eigenda/common" commonmock "github.com/Layr-Labs/eigenda/common/mock" + "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" corev2 "github.com/Layr-Labs/eigenda/core/v2" + v2 "github.com/Layr-Labs/eigenda/core/v2" dispcommon "github.com/Layr-Labs/eigenda/disperser/common" commonv2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/Layr-Labs/eigenda/disperser/controller" @@ -294,16 +296,30 @@ func newTestComponents(t *testing.T, mockPool bool) *testComponents { encodingClient := dispmock.NewMockEncoderClientV2() chainReader := &coremock.MockWriter{} chainReader.On("GetCurrentBlockNumber").Return(blockNumber, nil) + chainReader.On("GetAllVersionedBlobParams", mock.Anything).Return(map[v2.BlobVersion]*core.BlobVersionParameters{ + 0: { + NumChunks: 8192, + CodingRate: 8, + MaxNumOperators: 3537, + }, + }, nil) + onchainRefreshInterval := 1 * time.Millisecond em, err := controller.NewEncodingManager(&controller.EncodingManagerConfig{ - PullInterval: 1 * time.Second, - EncodingRequestTimeout: 5 * time.Second, - StoreTimeout: 5 * time.Second, - NumEncodingRetries: 1, - NumRelayAssignment: 2, - AvailableRelays: []corev2.RelayKey{0, 1, 2, 3}, - MaxNumBlobsPerIteration: 5, + PullInterval: 1 * time.Second, + EncodingRequestTimeout: 5 * time.Second, + StoreTimeout: 5 * time.Second, + NumEncodingRetries: 1, + NumRelayAssignment: 2, + AvailableRelays: []corev2.RelayKey{0, 1, 2, 3}, + MaxNumBlobsPerIteration: 5, + OnchainStateRefreshInterval: onchainRefreshInterval, }, blobMetadataStore, pool, encodingClient, chainReader, logger) - require.NoError(t, err) + assert.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 2*onchainRefreshInterval) + defer cancel() + // Start the encoding manager to fetch the onchain state + _ = em.Start(ctx) return &testComponents{ EncodingManager: em, Pool: pool, diff --git a/disperser/encoder/server_v2_test.go b/disperser/encoder/server_v2_test.go index 57a850b2c0..69b30b7448 100644 --- a/disperser/encoder/server_v2_test.go +++ b/disperser/encoder/server_v2_test.go @@ -24,6 +24,12 @@ import ( "golang.org/x/exp/rand" ) +var blobParams = &core.BlobVersionParameters{ + NumChunks: 8192, + CodingRate: 8, + MaxNumOperators: 3537, +} + type testComponents struct { encoderServer *encoder.EncoderServerV2 blobStore *blobstore.BlobStore @@ -58,8 +64,8 @@ func TestEncodeBlob(t *testing.T) { ) var ( - codingRatio = corev2.ParametersMap[0].CodingRate - numChunks = corev2.ParametersMap[0].NumChunks + codingRatio = blobParams.CodingRate + numChunks = blobParams.NumChunks ) ctx, cancel := context.WithTimeout(context.Background(), timeoutSeconds*time.Second) @@ -85,7 +91,7 @@ func TestEncodeBlob(t *testing.T) { blobLength := encoding.GetBlobLength(blobSize) // Get chunk length for blob version 0 - chunkLength, err := corev2.GetChunkLength(0, core.NextPowerOf2(uint32(blobLength))) + chunkLength, err := corev2.GetChunkLength(core.NextPowerOf2(uint32(blobLength)), blobParams) if !assert.NoError(t, err, "Failed to get chunk length") { t.FailNow() } diff --git a/node/config.go b/node/config.go index e67fc7894f..7d73bdcb20 100644 --- a/node/config.go +++ b/node/config.go @@ -89,7 +89,8 @@ type Config struct { LoggerConfig common.LoggerConfig EncoderConfig kzg.KzgConfig - EnableV2 bool + EnableV2 bool + OnchainStateRefreshInterval time.Duration } // NewConfig parses the Config from the provided flags or environment variables and @@ -235,5 +236,6 @@ func NewConfig(ctx *cli.Context) (*Config, error) { BLSSignerTLSCertFilePath: ctx.GlobalString(flags.BLSSignerCertFileFlag.Name), BLSRemoteSignerEnabled: blsRemoteSignerEnabled, EnableV2: ctx.GlobalBool(flags.EnableV2Flag.Name), + OnchainStateRefreshInterval: ctx.GlobalDuration(flags.OnchainStateRefreshIntervalFlag.Name), }, nil } diff --git a/node/flags/flags.go b/node/flags/flags.go index 5bcd95a98b..b16e2709e9 100644 --- a/node/flags/flags.go +++ b/node/flags/flags.go @@ -224,6 +224,13 @@ var ( Required: false, EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ENABLE_V2"), } + OnchainStateRefreshIntervalFlag = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "onchain-state-refresh-interval"), + Usage: "The interval at which to refresh the onchain state. This flag is only relevant in v2 (default: 1h)", + Required: false, + EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ONCHAIN_STATE_REFRESH_INTERVAL"), + Value: 1 * time.Hour, + } // Test only, DO NOT USE the following flags in production @@ -353,6 +360,7 @@ var optionalFlags = []cli.Flag{ BLSPublicKeyHexFlag, BLSSignerCertFileFlag, EnableV2Flag, + OnchainStateRefreshIntervalFlag, } func init() { diff --git a/node/grpc/server_v2_test.go b/node/grpc/server_v2_test.go index 6bb15870c6..249bd6bb77 100644 --- a/node/grpc/server_v2_test.go +++ b/node/grpc/server_v2_test.go @@ -29,6 +29,17 @@ import ( "google.golang.org/grpc/status" ) +var ( + blobParams = &core.BlobVersionParameters{ + NumChunks: 8192, + CodingRate: 8, + MaxNumOperators: 3537, + } + blobParamsMap = map[v2.BlobVersion]*core.BlobVersionParameters{ + 0: blobParams, + } +) + type testComponents struct { server *grpc.ServerV2 node *node.Node @@ -67,6 +78,7 @@ func newTestComponents(t *testing.T, config *node.Config) *testComponents { ValidatorV2: val, RelayClient: relay, } + node.BlobVersionParams.Store(v2.NewBlobVersionParameterMap(blobParamsMap)) server := grpc.NewServerV2(config, node, logger, ratelimiter) return &testComponents{ server: server, diff --git a/node/node.go b/node/node.go index d22058c7a5..925fbe4517 100644 --- a/node/node.go +++ b/node/node.go @@ -14,6 +14,7 @@ import ( "os" "strings" "sync" + "sync/atomic" "time" "github.com/Layr-Labs/eigenda/common/kvstore/tablestore" @@ -36,6 +37,7 @@ import ( "github.com/Layr-Labs/eigenda/core/eth" "github.com/Layr-Labs/eigenda/core/indexer" corev2 "github.com/Layr-Labs/eigenda/core/v2" + v2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/Layr-Labs/eigensdk-go/metrics" rpccalls "github.com/Layr-Labs/eigensdk-go/metrics/collectors/rpc_calls" @@ -79,6 +81,10 @@ type Node struct { mu sync.Mutex CurrentSocket string + + // BlobVersionParams is a map of blob version parameters loaded from the chain. + // It is used to determine blob parameters based on the version number. + BlobVersionParams atomic.Pointer[corev2.BlobVersionParameterMap] } // NewNode creates a new Node with the provided config. @@ -177,7 +183,7 @@ func NewNode( } asgn := &core.StdAssignmentCoordinator{} validator := core.NewShardValidator(v, asgn, cst, config.ID) - validatorV2 := corev2.NewShardValidator(v, config.ID) + validatorV2 := corev2.NewShardValidator(v, config.ID, logger) // Resolve the BLOCK_STALE_MEASURE and STORE_DURATION_BLOCKS. var blockStaleMeasure, storeDurationBlocks uint32 @@ -217,7 +223,31 @@ func NewNode( "eigenDAServiceManagerAddr", config.EigenDAServiceManagerAddr, "blockStaleMeasure", blockStaleMeasure, "storeDurationBlocks", storeDurationBlocks, "enableGnarkBundleEncoding", config.EnableGnarkBundleEncoding) var relayClient clients.RelayClient + + n := &Node{ + Config: config, + Logger: nodeLogger, + KeyPair: keyPair, + Metrics: metrics, + NodeApi: nodeApi, + Store: store, + ChainState: cst, + Transactor: tx, + Validator: validator, + ValidatorV2: validatorV2, + PubIPProvider: pubIPProvider, + OperatorSocketsFilterer: socketsFilterer, + ChainID: chainID, + RelayClient: relayClient, + BLSSigner: blsClient, + } + + if !config.EnableV2 { + return n, nil + } + var storeV2 StoreV2 + var blobVersionParams *corev2.BlobVersionParameterMap if config.EnableV2 { v2Path := config.DbPath + "/chunk_v2" dbV2, err := tablestore.Start(logger, &tablestore.Config{ @@ -233,27 +263,18 @@ func NewNode( } storeV2 = NewLevelDBStoreV2(dbV2, logger) + blobParams, err := tx.GetAllVersionedBlobParams(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to get versioned blob parameters: %w", err) + } + blobVersionParams = corev2.NewBlobVersionParameterMap(blobParams) + // TODO(ian-shim): Create a new relay client with relay addresses onchain } - return &Node{ - Config: config, - Logger: nodeLogger, - KeyPair: keyPair, - Metrics: metrics, - NodeApi: nodeApi, - Store: store, - StoreV2: storeV2, - ChainState: cst, - Transactor: tx, - Validator: validator, - ValidatorV2: validatorV2, - PubIPProvider: pubIPProvider, - OperatorSocketsFilterer: socketsFilterer, - ChainID: chainID, - RelayClient: relayClient, - BLSSigner: blsClient, - }, nil + n.StoreV2 = storeV2 + n.BlobVersionParams.Store(blobVersionParams) + return n, nil } // Start starts the Node. If the node is not registered, register it on chain, otherwise just @@ -271,6 +292,12 @@ func (n *Node) Start(ctx context.Context) error { go n.expireLoop() go n.checkNodeReachability() + if n.Config.EnableV2 { + go func() { + _ = n.RefreshOnchainState(ctx) + }() + } + // Build the socket based on the hostname/IP provided in the CLI socket := string(core.MakeOperatorSocket(n.Config.Hostname, n.Config.DispersalPort, n.Config.RetrievalPort)) var operator *Operator @@ -354,6 +381,30 @@ func (n *Node) expireLoop() { } } +func (n *Node) RefreshOnchainState(ctx context.Context) error { + if !n.Config.EnableV2 || n.Config.OnchainStateRefreshInterval <= 0 { + return nil + } + ticker := time.NewTicker(n.Config.OnchainStateRefreshInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + n.Logger.Info("Refreshing onchain state") + blobParams, err := n.Transactor.GetAllVersionedBlobParams(ctx) + if err != nil { + n.Logger.Error("error fetching blob params", "err", err) + continue + } + + n.BlobVersionParams.Store(v2.NewBlobVersionParameterMap(blobParams)) + case <-ctx.Done(): + return ctx.Err() + } + } +} + // ProcessBatch validates the batch is correct, stores data into the node's Store, and then returns a signature for the entire batch. // // The batch will be itemized into batch header, header and chunks of each blob in the batch. These items will diff --git a/node/node_test.go b/node/node_test.go index 85ae43ab64..f2607362f0 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -12,6 +12,7 @@ import ( "github.com/Layr-Labs/eigenda/common/geth" "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" + v2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/node" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -20,6 +21,15 @@ import ( var ( privateKey = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" opID = [32]byte{0} + + blobParams = &core.BlobVersionParameters{ + NumChunks: 8192, + CodingRate: 8, + MaxNumOperators: 3537, + } + blobParamsMap = map[v2.BlobVersion]*core.BlobVersionParameters{ + 0: blobParams, + } ) type components struct { @@ -72,18 +82,20 @@ func newComponents(t *testing.T) *components { } defer os.Remove(dbPath) relayClient := clientsmock.NewRelayClient() + n := &node.Node{ + Config: config, + Logger: logger, + KeyPair: keyPair, + Metrics: nil, + Store: store, + ChainState: chainState, + Validator: mockVal, + Transactor: tx, + RelayClient: relayClient, + } + n.BlobVersionParams.Store(v2.NewBlobVersionParameterMap(blobParamsMap)) return &components{ - node: &node.Node{ - Config: config, - Logger: logger, - KeyPair: keyPair, - Metrics: nil, - Store: store, - ChainState: chainState, - Validator: mockVal, - Transactor: tx, - RelayClient: relayClient, - }, + node: n, tx: tx, relayClient: relayClient, } diff --git a/node/node_v2.go b/node/node_v2.go index 5dd91992a5..c37c4c4647 100644 --- a/node/node_v2.go +++ b/node/node_v2.go @@ -38,6 +38,11 @@ func (n *Node) DownloadBundles(ctx context.Context, batch *corev2.Batch, operato return nil, nil, fmt.Errorf("relay client is not set") } + blobVersionParams := n.BlobVersionParams.Load() + if blobVersionParams == nil { + return nil, nil, fmt.Errorf("blob version params is nil") + } + blobShards := make([]*corev2.BlobShard, len(batch.BlobCertificates)) rawBundles := make([]*RawBundles, len(batch.BlobCertificates)) requests := make(map[corev2.RelayKey]*relayRequest) @@ -61,7 +66,11 @@ func (n *Node) DownloadBundles(ctx context.Context, batch *corev2.Batch, operato relayIndex := rand.Intn(len(cert.RelayKeys)) relayKey := cert.RelayKeys[relayIndex] for _, quorum := range cert.BlobHeader.QuorumNumbers { - assgn, err := corev2.GetAssignment(operatorState, batch.BlobCertificates[0].BlobHeader.BlobVersion, quorum, n.Config.ID) + blobParams, ok := blobVersionParams.Get(cert.BlobHeader.BlobVersion) + if !ok { + return nil, nil, fmt.Errorf("blob version %d not found", cert.BlobHeader.BlobVersion) + } + assgn, err := corev2.GetAssignment(operatorState, blobParams, quorum, n.Config.ID) if err != nil { return nil, nil, fmt.Errorf("failed to get assignments: %v", err) } @@ -145,5 +154,6 @@ func (n *Node) ValidateBatchV2( return fmt.Errorf("failed to validate batch header: %v", err) } pool := workerpool.New(n.Config.NumBatchValidators) - return n.ValidatorV2.ValidateBlobs(ctx, blobShards, pool, operatorState) + blobVersionParams := n.BlobVersionParams.Load() + return n.ValidatorV2.ValidateBlobs(ctx, blobShards, blobVersionParams, pool, operatorState) } diff --git a/node/node_v2_test.go b/node/node_v2_test.go index 3fc9795dde..cc56550439 100644 --- a/node/node_v2_test.go +++ b/node/node_v2_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/Layr-Labs/eigenda/api/clients" "github.com/Layr-Labs/eigenda/core" @@ -122,6 +123,38 @@ func TestDownloadBundlesFail(t *testing.T) { require.Nil(t, rawBundles) } +func TestRefreshOnchainState(t *testing.T) { + c := newComponents(t) + c.node.Config.EnableV2 = true + c.node.Config.OnchainStateRefreshInterval = time.Millisecond + ctx := context.Background() + bp, ok := c.node.BlobVersionParams.Load().Get(0) + require.True(t, ok) + require.Equal(t, bp, blobParams) + _, ok = c.node.BlobVersionParams.Load().Get(1) + require.False(t, ok) + + newCtx, cancel := context.WithTimeout(ctx, c.node.Config.OnchainStateRefreshInterval*2) + defer cancel() + blobParams2 := &core.BlobVersionParameters{ + NumChunks: 111, + CodingRate: 1, + MaxNumOperators: 222, + } + c.tx.On("GetAllVersionedBlobParams", mock.Anything).Return(map[v2.BlobVersion]*core.BlobVersionParameters{ + 0: blobParams, + 1: blobParams2, + }, nil) + err := c.node.RefreshOnchainState(newCtx) + require.ErrorIs(t, err, context.DeadlineExceeded) + bp, ok = c.node.BlobVersionParams.Load().Get(0) + require.True(t, ok) + require.Equal(t, bp, blobParams) + bp, ok = c.node.BlobVersionParams.Load().Get(1) + require.True(t, ok) + require.Equal(t, bp, blobParams2) +} + func bundleEqual(t *testing.T, expected, actual core.Bundle) { for i := range expected { frameEqual(t, expected[i], actual[i]) diff --git a/relay/cmd/config.go b/relay/cmd/config.go index 42078db96a..13f56e7ada 100644 --- a/relay/cmd/config.go +++ b/relay/cmd/config.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/aws" "github.com/Layr-Labs/eigenda/common/geth" @@ -80,9 +81,10 @@ func NewConfig(ctx *cli.Context) (Config, error) { GetChunkBytesBurstinessClient: ctx.Int(flags.GetChunkBytesBurstinessClientFlag.Name), MaxConcurrentGetChunkOpsClient: ctx.Int(flags.MaxConcurrentGetChunkOpsClientFlag.Name), }, - AuthenticationKeyCacheSize: ctx.Int(flags.AuthenticationKeyCacheSizeFlag.Name), - AuthenticationTimeout: ctx.Duration(flags.AuthenticationTimeoutFlag.Name), - AuthenticationDisabled: ctx.Bool(flags.AuthenticationDisabledFlag.Name), + AuthenticationKeyCacheSize: ctx.Int(flags.AuthenticationKeyCacheSizeFlag.Name), + AuthenticationTimeout: ctx.Duration(flags.AuthenticationTimeoutFlag.Name), + AuthenticationDisabled: ctx.Bool(flags.AuthenticationDisabledFlag.Name), + OnchainStateRefreshInterval: ctx.Duration(flags.OnchainStateRefreshIntervalFlag.Name), }, EthClientConfig: geth.ReadEthClientConfig(ctx), BLSOperatorStateRetrieverAddr: ctx.String(flags.BlsOperatorStateRetrieverAddrFlag.Name), diff --git a/relay/cmd/flags/flags.go b/relay/cmd/flags/flags.go index 57fcc6cbd0..f471765966 100644 --- a/relay/cmd/flags/flags.go +++ b/relay/cmd/flags/flags.go @@ -1,11 +1,12 @@ package flags import ( + "time" + "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/aws" "github.com/Layr-Labs/eigenda/common/geth" "github.com/urfave/cli" - "time" ) const ( @@ -230,6 +231,13 @@ var ( Required: false, EnvVar: common.PrefixEnvVar(envVarPrefix, "AUTHENTICATION_DISABLED"), } + OnchainStateRefreshIntervalFlag = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "onchain-state-refresh-interval"), + Usage: "The interval at which to refresh the onchain state", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "ONCHAIN_STATE_REFRESH_INTERVAL"), + Value: 1 * time.Hour, + } ) var requiredFlags = []cli.Flag{ @@ -268,6 +276,7 @@ var optionalFlags = []cli.Flag{ AuthenticationKeyCacheSizeFlag, AuthenticationTimeoutFlag, AuthenticationDisabledFlag, + OnchainStateRefreshIntervalFlag, } var Flags []cli.Flag diff --git a/relay/cmd/main.go b/relay/cmd/main.go index eb39cdb03a..bffdd9fa9e 100644 --- a/relay/cmd/main.go +++ b/relay/cmd/main.go @@ -3,14 +3,13 @@ package main import ( "context" "fmt" + "log" + "os" + "github.com/Layr-Labs/eigenda/common/geth" - "github.com/Layr-Labs/eigenda/core" coreeth "github.com/Layr-Labs/eigenda/core/eth" "github.com/Layr-Labs/eigenda/core/thegraph" - "github.com/Layr-Labs/eigensdk-go/logging" gethcommon "github.com/ethereum/go-ethereum/common" - "log" - "os" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/aws/dynamodb" @@ -70,11 +69,19 @@ func RunRelay(ctx *cli.Context) error { metadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.MetadataTableName) blobStore := blobstore.NewBlobStore(config.BucketName, s3Client, logger) chunkReader := chunkstore.NewChunkReader(logger, s3Client, config.BucketName) - ics, err := buildICS(logger, &config) + client, err := geth.NewMultiHomingClient(config.EthClientConfig, gethcommon.Address{}, logger) if err != nil { - return fmt.Errorf("failed to build ics: %w", err) + return fmt.Errorf("failed to create eth client: %w", err) } + tx, err := coreeth.NewWriter(logger, client, config.BLSOperatorStateRetrieverAddr, config.EigenDAServiceManagerAddr) + if err != nil { + return fmt.Errorf("failed to create eth writer: %w", err) + } + + cs := coreeth.NewChainState(tx, client) + ics := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) + server, err := relay.NewServer( context.Background(), logger, @@ -82,33 +89,17 @@ func RunRelay(ctx *cli.Context) error { metadataStore, blobStore, chunkReader, - ics) + tx, + ics, + ) if err != nil { return fmt.Errorf("failed to create relay server: %w", err) } - err = server.Start() + err = server.Start(context.Background()) if err != nil { return fmt.Errorf("failed to start relay server: %w", err) } return nil } - -func buildICS(logger logging.Logger, config *Config) (core.IndexedChainState, error) { - client, err := geth.NewMultiHomingClient(config.EthClientConfig, gethcommon.Address{}, logger) - if err != nil { - logger.Error("Cannot create chain.Client", "err", err) - return nil, err - } - - tx, err := coreeth.NewWriter(logger, client, config.BLSOperatorStateRetrieverAddr, config.EigenDAServiceManagerAddr) - if err != nil { - return nil, fmt.Errorf("failed to create eth writer: %w", err) - } - - cs := coreeth.NewChainState(tx, client) - ics := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) - - return ics, nil -} diff --git a/relay/metadata_provider.go b/relay/metadata_provider.go index e70531702d..f86bccfddd 100644 --- a/relay/metadata_provider.go +++ b/relay/metadata_provider.go @@ -40,6 +40,9 @@ type metadataProvider struct { // relayIDSet is the set of relay IDs assigned to this relay. This relay will refuse to serve metadata for blobs // that are not assigned to one of these IDs. relayIDSet map[v2.RelayKey]struct{} + + // blobParamsMap is a map of blob version to blob version parameters. + blobParamsMap atomic.Pointer[v2.BlobVersionParameterMap] } // newMetadataProvider creates a new metadataProvider. @@ -49,7 +52,9 @@ func newMetadataProvider( metadataStore *blobstore.BlobMetadataStore, metadataCacheSize int, maxIOConcurrency int, - relayIDs []v2.RelayKey) (*metadataProvider, error) { + relayIDs []v2.RelayKey, + blobParamsMap *v2.BlobVersionParameterMap, +) (*metadataProvider, error) { relayIDSet := make(map[v2.RelayKey]struct{}, len(relayIDs)) for _, id := range relayIDs { @@ -62,6 +67,7 @@ func newMetadataProvider( metadataStore: metadataStore, relayIDSet: relayIDSet, } + server.blobParamsMap.Store(blobParamsMap) metadataCache, err := cache.NewCachedAccessor[v2.BlobKey, *blobMetadata]( metadataCacheSize, @@ -141,8 +147,17 @@ func (m *metadataProvider) GetMetadataForBlobs(keys []v2.BlobKey) (metadataMap, return mMap, nil } +func (m *metadataProvider) UpdateBlobVersionParameters(blobParamsMap *v2.BlobVersionParameterMap) { + m.blobParamsMap.Store(blobParamsMap) +} + // fetchMetadata retrieves metadata about a blob. Fetches from the cache if available, otherwise from the store. func (m *metadataProvider) fetchMetadata(key v2.BlobKey) (*blobMetadata, error) { + blobParamsMap := m.blobParamsMap.Load() + if blobParamsMap == nil { + return nil, fmt.Errorf("blob version parameters is nil") + } + // Retrieve the metadata from the store. cert, fragmentInfo, err := m.metadataStore.GetBlobCertificate(m.ctx, key) if err != nil { @@ -165,7 +180,11 @@ func (m *metadataProvider) fetchMetadata(key v2.BlobKey) (*blobMetadata, error) // TODO(cody-littley): blob size is not correct https://github.com/Layr-Labs/eigenda/pull/906#discussion_r1847396530 blobSize := uint32(cert.BlobHeader.BlobCommitments.Length) * encoding.BYTES_PER_SYMBOL - chunkSize, err := v2.GetChunkLength(cert.BlobHeader.BlobVersion, blobSize) + blobParams, ok := blobParamsMap.Get(cert.BlobHeader.BlobVersion) + if !ok { + return nil, fmt.Errorf("blob version %d not found in blob params map", cert.BlobHeader.BlobVersion) + } + chunkSize, err := v2.GetChunkLength(blobSize, blobParams) chunkSize *= encoding.BYTES_PER_SYMBOL if err != nil { return nil, fmt.Errorf("error getting chunk length: %w", err) diff --git a/relay/metadata_provider_test.go b/relay/metadata_provider_test.go index 32e5a3e80c..228ec83264 100644 --- a/relay/metadata_provider_test.go +++ b/relay/metadata_provider_test.go @@ -23,7 +23,7 @@ func TestGetNonExistentBlob(t *testing.T) { defer teardown() metadataStore := buildMetadataStore(t) - server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, nil) + server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, nil, v2.NewBlobVersionParameterMap(mockBlobParamsMap())) require.NoError(t, err) // Try to fetch a non-existent blobs @@ -81,7 +81,7 @@ func TestFetchingIndividualMetadata(t *testing.T) { require.Equal(t, fragmentSizeMap[blobKey], fragmentInfo.FragmentSizeBytes) } - server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, nil) + server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, nil, v2.NewBlobVersionParameterMap(mockBlobParamsMap())) require.NoError(t, err) // Fetch the metadata from the server. @@ -157,7 +157,7 @@ func TestBatchedFetch(t *testing.T) { require.Equal(t, fragmentSizeMap[blobKey], fragmentInfo.FragmentSizeBytes) } - server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, nil) + server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, nil, v2.NewBlobVersionParameterMap(mockBlobParamsMap())) require.NoError(t, err) // Each iteration, choose a random subset of the keys to fetch @@ -255,7 +255,7 @@ func TestIndividualFetchWithSharding(t *testing.T) { require.Equal(t, fragmentSizeMap[blobKey], fragmentInfo.FragmentSizeBytes) } - server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, shardList) + server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, shardList, v2.NewBlobVersionParameterMap(mockBlobParamsMap())) require.NoError(t, err) // Fetch the metadata from the server. @@ -379,7 +379,7 @@ func TestBatchedFetchWithSharding(t *testing.T) { require.Equal(t, fragmentSizeMap[blobKey], fragmentInfo.FragmentSizeBytes) } - server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, shardList) + server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, shardList, v2.NewBlobVersionParameterMap(mockBlobParamsMap())) require.NoError(t, err) // Each iteration, choose two random keys to fetch. There will be a 25% chance that both blobs map to valid shards. diff --git a/relay/relay_test_utils.go b/relay/relay_test_utils.go index 9e99abe929..2a075a8294 100644 --- a/relay/relay_test_utils.go +++ b/relay/relay_test_utils.go @@ -20,6 +20,8 @@ import ( test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils" "github.com/Layr-Labs/eigenda/common/aws/s3" tu "github.com/Layr-Labs/eigenda/common/testutils" + "github.com/Layr-Labs/eigenda/core" + coremock "github.com/Layr-Labs/eigenda/core/mock" v2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigenda/encoding" @@ -32,6 +34,7 @@ import ( "github.com/Layr-Labs/eigensdk-go/logging" "github.com/google/uuid" "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -176,6 +179,24 @@ func buildChunkStore(t *testing.T, logger logging.Logger) (chunkstore.ChunkReade return chunkReader, chunkWriter } +func newMockChainReader() *coremock.MockWriter { + w := &coremock.MockWriter{} + w.On("GetAllVersionedBlobParams", mock.Anything).Return(mockBlobParamsMap(), nil) + return w +} + +func mockBlobParamsMap() map[uint8]*core.BlobVersionParameters { + blobParams := &core.BlobVersionParameters{ + NumChunks: 8192, + CodingRate: 8, + MaxNumOperators: 3537, + } + + return map[v2.BlobVersion]*core.BlobVersionParameters{ + 0: blobParams, + } +} + func randomBlob(t *testing.T) (*v2.BlobHeader, []byte) { data := tu.RandomBytes(225) // TODO talk to Ian about this diff --git a/relay/server.go b/relay/server.go index 730a13f82d..32267de763 100644 --- a/relay/server.go +++ b/relay/server.go @@ -54,6 +54,9 @@ type Server struct { // authenticator is used to authenticate requests to the relay service. authenticator auth.RequestAuthenticator + + // chainReader is the core.Reader used to fetch blob parameters. + chainReader core.Reader } type Config struct { @@ -104,6 +107,9 @@ type Config struct { // AuthenticationDisabled will disable authentication if set to true. AuthenticationDisabled bool + + // OnchainStateRefreshInterval is the interval at which the onchain state is refreshed. + OnchainStateRefreshInterval time.Duration } // NewServer creates a new relay Server. @@ -114,7 +120,17 @@ func NewServer( metadataStore *blobstore.BlobMetadataStore, blobStore *blobstore.BlobStore, chunkReader chunkstore.ChunkReader, - ics core.IndexedChainState) (*Server, error) { + chainReader core.Reader, + ics core.IndexedChainState, +) (*Server, error) { + if chainReader == nil { + return nil, errors.New("chainReader is required") + } + + blobParams, err := chainReader.GetAllVersionedBlobParams(ctx) + if err != nil { + return nil, fmt.Errorf("error fetching blob params: %w", err) + } mp, err := newMetadataProvider( ctx, @@ -122,7 +138,9 @@ func NewServer( metadataStore, config.MetadataCacheSize, config.MetadataMaxConcurrency, - config.RelayIDs) + config.RelayIDs, + v2.NewBlobVersionParameterMap(blobParams), + ) if err != nil { return nil, fmt.Errorf("error creating metadata provider: %w", err) } @@ -399,7 +417,13 @@ func computeChunkRequestRequiredBandwidth(request *pb.GetChunksRequest, mMap met } // Start starts the server listening for requests. This method will block until the server is stopped. -func (s *Server) Start() error { +func (s *Server) Start(ctx context.Context) error { + if s.chainReader != nil && s.metadataProvider != nil { + go func() { + _ = s.RefreshOnchainState(ctx) + }() + } + // Serve grpc requests addr := fmt.Sprintf("0.0.0.0:%d", s.config.GRPCPort) listener, err := net.Listen("tcp", addr) @@ -426,6 +450,26 @@ func (s *Server) Start() error { return nil } +func (s *Server) RefreshOnchainState(ctx context.Context) error { + ticker := time.NewTicker(s.config.OnchainStateRefreshInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + s.logger.Info("refreshing onchain state") + blobParams, err := s.chainReader.GetAllVersionedBlobParams(ctx) + if err != nil { + s.logger.Error("error fetching blob params", "err", err) + continue + } + s.metadataProvider.UpdateBlobVersionParameters(v2.NewBlobVersionParameterMap(blobParams)) + case <-ctx.Done(): + return ctx.Err() + } + } +} + // Stop stops the server. func (s *Server) Stop() { if s.grpcServer != nil { diff --git a/relay/server_test.go b/relay/server_test.go index f67a8fb650..baaf873150 100644 --- a/relay/server_test.go +++ b/relay/server_test.go @@ -94,6 +94,7 @@ func TestReadWriteBlobs(t *testing.T) { // These are used to write data to S3/dynamoDB metadataStore := buildMetadataStore(t) blobStore := buildBlobStore(t, logger) + chainReader := newMockChainReader() // This is the server used to read it back config := defaultConfig() @@ -104,11 +105,12 @@ func TestReadWriteBlobs(t *testing.T) { metadataStore, blobStore, nil, /* not used in this test*/ + chainReader, nil /* not used in this test*/) require.NoError(t, err) go func() { - err = server.Start() + err = server.Start(context.Background()) require.NoError(t, err) }() defer server.Stop() @@ -175,6 +177,7 @@ func TestReadNonExistentBlob(t *testing.T) { // This is the server used to read it back config := defaultConfig() + chainReader := newMockChainReader() server, err := NewServer( context.Background(), logger, @@ -182,11 +185,12 @@ func TestReadNonExistentBlob(t *testing.T) { metadataStore, blobStore, nil, /* not used in this test */ + chainReader, nil /* not used in this test*/) require.NoError(t, err) go func() { - err = server.Start() + err = server.Start(context.Background()) require.NoError(t, err) }() defer server.Stop() @@ -228,6 +232,7 @@ func TestReadWriteBlobsWithSharding(t *testing.T) { // This is the server used to read it back config := defaultConfig() config.RelayIDs = shardList + chainReader := newMockChainReader() server, err := NewServer( context.Background(), logger, @@ -235,11 +240,12 @@ func TestReadWriteBlobsWithSharding(t *testing.T) { metadataStore, blobStore, nil, /* not used in this test*/ + chainReader, nil /* not used in this test*/) require.NoError(t, err) go func() { - err = server.Start() + err = server.Start(context.Background()) require.NoError(t, err) }() defer server.Stop() @@ -346,6 +352,7 @@ func TestReadWriteChunks(t *testing.T) { config.RateLimits.GetChunkOpsBurstiness = 1000 config.RateLimits.MaxGetChunkOpsPerSecondClient = 1000 config.RateLimits.GetChunkOpsBurstinessClient = 1000 + chainReader := newMockChainReader() server, err := NewServer( context.Background(), logger, @@ -353,11 +360,12 @@ func TestReadWriteChunks(t *testing.T) { metadataStore, nil, /* not used in this test*/ chunkReader, + chainReader, nil /* not used in this test*/) require.NoError(t, err) go func() { - err = server.Start() + err = server.Start(context.Background()) require.NoError(t, err) }() defer server.Stop() @@ -542,6 +550,7 @@ func TestBatchedReadWriteChunks(t *testing.T) { // This is the server used to read it back config := defaultConfig() + chainReader := newMockChainReader() server, err := NewServer( context.Background(), logger, @@ -549,11 +558,12 @@ func TestBatchedReadWriteChunks(t *testing.T) { metadataStore, nil, /* not used in this test */ chunkReader, + chainReader, nil /* not used in this test*/) require.NoError(t, err) go func() { - err = server.Start() + err = server.Start(context.Background()) require.NoError(t, err) }() defer server.Stop() @@ -668,6 +678,7 @@ func TestReadWriteChunksWithSharding(t *testing.T) { config.RateLimits.GetChunkOpsBurstiness = 1000 config.RateLimits.MaxGetChunkOpsPerSecondClient = 1000 config.RateLimits.GetChunkOpsBurstinessClient = 1000 + chainReader := newMockChainReader() server, err := NewServer( context.Background(), logger, @@ -675,11 +686,12 @@ func TestReadWriteChunksWithSharding(t *testing.T) { metadataStore, nil, /* not used in this test*/ chunkReader, + chainReader, nil /* not used in this test*/) require.NoError(t, err) go func() { - err = server.Start() + err = server.Start(context.Background()) require.NoError(t, err) }() defer server.Stop() @@ -943,6 +955,7 @@ func TestBatchedReadWriteChunksWithSharding(t *testing.T) { config.RateLimits.GetChunkOpsBurstiness = 1000 config.RateLimits.MaxGetChunkOpsPerSecondClient = 1000 config.RateLimits.GetChunkOpsBurstinessClient = 1000 + chainReader := newMockChainReader() server, err := NewServer( context.Background(), logger, @@ -950,11 +963,12 @@ func TestBatchedReadWriteChunksWithSharding(t *testing.T) { metadataStore, nil, /* not used in this test */ chunkReader, + chainReader, nil /* not used in this test*/) require.NoError(t, err) go func() { - err = server.Start() + err = server.Start(context.Background()) require.NoError(t, err) }() defer server.Stop()