From 30b6bb432502a7e90d1584a64e5c60e81a007ee3 Mon Sep 17 00:00:00 2001 From: Robert Raynor <35671663+mooselumph@users.noreply.github.com> Date: Wed, 20 Mar 2024 10:11:44 -0700 Subject: [PATCH] Use finalized reference block (#350) --- .../src/core/EigenDAServiceManagerStorage.sol | 12 +++- disperser/batcher/batcher.go | 3 + disperser/batcher/batcher_test.go | 5 +- disperser/batcher/encoding_streamer.go | 6 ++ disperser/batcher/encoding_streamer_test.go | 13 ++-- disperser/cmd/batcher/config.go | 1 + disperser/cmd/batcher/flags/flags.go | 8 +++ inabox/deploy/config.go | 16 +---- inabox/deploy/env_vars.go | 60 ++++++++++--------- inabox/templates/testconfig-anvil-docker.yaml | 1 - inabox/templates/testconfig-anvil.yaml | 1 - 11 files changed, 74 insertions(+), 52 deletions(-) diff --git a/contracts/src/core/EigenDAServiceManagerStorage.sol b/contracts/src/core/EigenDAServiceManagerStorage.sol index a8ec957e82..d3d2f97467 100644 --- a/contracts/src/core/EigenDAServiceManagerStorage.sol +++ b/contracts/src/core/EigenDAServiceManagerStorage.sol @@ -23,8 +23,18 @@ abstract contract EigenDAServiceManagerStorage is IEigenDAServiceManager { * @dev BLOCK_STALE_MEASURE should be greater than the number of blocks till finalization, but not too much greater, as it is the amount of * time that nodes can be active after they have deregistered. The larger it is, the farther back stakes can be used, but the longer operators * have to serve after they've deregistered. + * + * Note that this parameter needs to accommodate the delays which are introduced by the disperser, which are of two types: + * - FinalizationBlockDelay: when initializing a batch, the disperser will use a ReferenceBlockNumber which is this many + * blocks behind the current block number. This is to ensure that the the operator state associated with the reference block + * will be stable. + * - BatchInterval: the batch itself will only be confirmed after the batch interval has passed. + * + * Currently, we use a FinalizationBlockDelay of 75 blocks and a BatchInterval of 50 blocks, + * So using a BLOCK_STALE_MEASURE of 300 should be sufficient to ensure that the batch is not + * stale when it is confirmed. */ - uint32 public constant BLOCK_STALE_MEASURE = 150; + uint32 public constant BLOCK_STALE_MEASURE = 300; /** * @notice The quorum adversary threshold percentages stored as an ordered bytes array diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 6f99ed0f28..72fb3b663b 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -57,6 +57,8 @@ type Config struct { BatchSizeMBLimit uint MaxNumRetriesPerBlob uint + FinalizationBlockDelay uint + TargetNumChunks uint MaxBlobsToFetchFromStore int } @@ -110,6 +112,7 @@ func NewBatcher( EncodingQueueLimit: config.EncodingRequestQueueSize, TargetNumChunks: config.TargetNumChunks, MaxBlobsToFetchFromStore: config.MaxBlobsToFetchFromStore, + FinalizationBlockDelay: config.FinalizationBlockDelay, } encodingWorkerPool := workerpool.New(config.NumConnections) encodingStreamer, err := NewEncodingStreamer(streamerConfig, queue, chainState, encoderClient, assignmentCoordinator, batchTrigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger) diff --git a/disperser/batcher/batcher_test.go b/disperser/batcher/batcher_test.go index 7feccb7ac0..9b5749b2b2 100644 --- a/disperser/batcher/batcher_test.go +++ b/disperser/batcher/batcher_test.go @@ -73,10 +73,12 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time. // Common Components logger := logging.NewNoopLogger() + finalizationBlockDelay := uint(75) + // Core Components cst, err := coremock.MakeChainDataMock(10) assert.NoError(t, err) - cst.On("GetCurrentBlockNumber").Return(uint(10), nil) + cst.On("GetCurrentBlockNumber").Return(uint(10)+finalizationBlockDelay, nil) asgn := &core.StdAssignmentCoordinator{} transactor := &coremock.MockTransactor{} transactor.On("OperatorIDToAddress").Return(gethcommon.Address{}, nil) @@ -99,6 +101,7 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time. BatchSizeMBLimit: 100, SRSOrder: 3000, MaxNumRetriesPerBlob: 2, + FinalizationBlockDelay: finalizationBlockDelay, } timeoutConfig := bat.TimeoutConfig{ EncodingTimeout: 10 * time.Second, diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index b87e62160a..c4bf7d7e75 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -46,6 +46,8 @@ type StreamerConfig struct { // Maximum number of Blobs to fetch from store MaxBlobsToFetchFromStore int + + FinalizationBlockDelay uint } type EncodingStreamer struct { @@ -208,6 +210,10 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan if err != nil { return fmt.Errorf("failed to get current block number, won't request encoding: %w", err) } else { + if blockNumber > e.FinalizationBlockDelay { + blockNumber -= e.FinalizationBlockDelay + } + e.mu.Lock() e.ReferenceBlockNumber = blockNumber e.mu.Unlock() diff --git a/disperser/batcher/encoding_streamer_test.go b/disperser/batcher/encoding_streamer_test.go index 7b3276ea0f..0277ec5ffa 100644 --- a/disperser/batcher/encoding_streamer_test.go +++ b/disperser/batcher/encoding_streamer_test.go @@ -26,6 +26,7 @@ var ( EncodingRequestTimeout: 5 * time.Second, EncodingQueueLimit: 100, MaxBlobsToFetchFromStore: 10, + FinalizationBlockDelay: 75, } ) @@ -206,7 +207,7 @@ func TestStreamingEncoding(t *testing.T) { assert.Nil(t, err) assert.Equal(t, disperser.Processing, metadata.BlobStatus) - c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10), nil) + c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10)+encodingStreamer.FinalizationBlockDelay, nil) out := make(chan batcher.EncodingResultOrStatus) err = encodingStreamer.RequestEncoding(context.Background(), out) @@ -322,7 +323,7 @@ func TestEncodingFailure(t *testing.T) { metadataKey, err := blobStore.StoreBlob(ctx, &blob, uint64(time.Now().UnixNano())) assert.Nil(t, err) - cst.On("GetCurrentBlockNumber").Return(uint(10), nil) + cst.On("GetCurrentBlockNumber").Return(uint(10)+encodingStreamer.FinalizationBlockDelay, nil) encoderClient.On("EncodeBlob", tmock.Anything, tmock.Anything, tmock.Anything).Return(nil, nil, errors.New("errrrr")) // request encoding out := make(chan batcher.EncodingResultOrStatus) @@ -350,7 +351,7 @@ func TestEncodingFailure(t *testing.T) { func TestPartialBlob(t *testing.T) { encodingStreamer, c := createEncodingStreamer(t, 10, 1e12, streamerConfig) - c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10), nil) + c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10)+encodingStreamer.FinalizationBlockDelay, nil) out := make(chan batcher.EncodingResultOrStatus) @@ -498,7 +499,7 @@ func TestIncorrectParameters(t *testing.T) { metadataKey, err := c.blobStore.StoreBlob(ctx, &blob, uint64(time.Now().UnixNano())) assert.Nil(t, err) - c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10), nil) + c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10)+encodingStreamer.FinalizationBlockDelay, nil) // request encoding out := make(chan batcher.EncodingResultOrStatus) @@ -519,7 +520,7 @@ func TestIncorrectParameters(t *testing.T) { func TestInvalidQuorum(t *testing.T) { encodingStreamer, c := createEncodingStreamer(t, 10, 1e12, streamerConfig) - c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10), nil) + c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10)+encodingStreamer.FinalizationBlockDelay, nil) out := make(chan batcher.EncodingResultOrStatus) @@ -601,7 +602,7 @@ func TestGetBatch(t *testing.T) { assert.Nil(t, err) assert.Equal(t, disperser.Processing, metadata2.BlobStatus) - c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10), nil) + c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10)+encodingStreamer.FinalizationBlockDelay, nil) // request encoding out := make(chan batcher.EncodingResultOrStatus) diff --git a/disperser/cmd/batcher/config.go b/disperser/cmd/batcher/config.go index ae149390fa..6e0d36aa35 100644 --- a/disperser/cmd/batcher/config.go +++ b/disperser/cmd/batcher/config.go @@ -58,6 +58,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name), TargetNumChunks: ctx.GlobalUint(flags.TargetNumChunksFlag.Name), MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name), + FinalizationBlockDelay: ctx.GlobalUint(flags.FinalizationBlockDelayFlag.Name), }, TimeoutConfig: batcher.TimeoutConfig{ EncodingTimeout: ctx.GlobalDuration(flags.EncodingTimeoutFlag.Name), diff --git a/disperser/cmd/batcher/flags/flags.go b/disperser/cmd/batcher/flags/flags.go index f6d27fa5d0..8a239131b6 100644 --- a/disperser/cmd/batcher/flags/flags.go +++ b/disperser/cmd/batcher/flags/flags.go @@ -177,6 +177,13 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_BLOBS_TO_FETCH_FROM_STORE"), Value: 100, } + FinalizationBlockDelayFlag = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "finalization-block-delay"), + Usage: "The block delay to use for pulling operator state in order to ensure the state is finalized", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "FINALIZATION_BLOCK_DELAY"), + Value: 75, + } ) var requiredFlags = []cli.Flag{ @@ -207,6 +214,7 @@ var optionalFlags = []cli.Flag{ MaxNumRetriesPerBlobFlag, TargetNumChunksFlag, MaxBlobsToFetchFromStoreFlag, + FinalizationBlockDelayFlag, } // Flags contains the list of configuration options available to the binary. diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go index a0d4be9ea9..c9201be618 100644 --- a/inabox/deploy/config.go +++ b/inabox/deploy/config.go @@ -146,10 +146,6 @@ func (env *Config) generateChurnerVars(ind int, graphUrl, logPath, grpcPort stri CHURNER_GRAPH_URL: graphUrl, CHURNER_INDEXER_PULL_INTERVAL: "1s", - CHURNER_STD_LOG_LEVEL: "debug", - CHURNER_FILE_LOG_LEVEL: "debug", - CHURNER_LOG_PATH: logPath, - CHURNER_ENABLE_METRICS: "true", CHURNER_METRICS_HTTP_PORT: "9095", } @@ -206,12 +202,8 @@ func (env *Config) generateBatcherVars(ind int, key, graphUrl, logPath string) B BATCHER_BLS_OPERATOR_STATE_RETRIVER: env.EigenDA.OperatorStateRetreiver, BATCHER_EIGENDA_SERVICE_MANAGER: env.EigenDA.ServiceManager, BATCHER_SRS_ORDER: "300000", - BATCHER_SRS_LOAD: "300000", BATCHER_CHAIN_RPC: "", BATCHER_PRIVATE_KEY: key[2:], - BATCHER_STD_LOG_LEVEL: "debug", - BATCHER_FILE_LOG_LEVEL: "debug", - BATCHER_LOG_PATH: logPath, BATCHER_GRAPH_URL: graphUrl, BATCHER_USE_GRAPH: "true", BATCHER_BATCH_SIZE_LIMIT: "10240", // 10 GiB @@ -224,6 +216,7 @@ func (env *Config) generateBatcherVars(ind int, key, graphUrl, logPath string) B BATCHER_ENCODING_REQUEST_QUEUE_SIZE: "500", BATCHER_NUM_CONFIRMATIONS: "0", BATCHER_MAX_BLOBS_TO_FETCH_FROM_STORE: "100", + BATCHER_FINALIZATION_BLOCK_DELAY: "5", } env.applyDefaults(&v, "BATCHER", "batcher", ind) @@ -306,9 +299,6 @@ func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath, NODE_VERBOSE: "true", NODE_CHAIN_RPC: "", NODE_PRIVATE_KEY: key[2:], - NODE_STD_LOG_LEVEL: "debug", - NODE_FILE_LOG_LEVEL: "debug", - NODE_LOG_PATH: logPath, NODE_NUM_BATCH_VALIDATORS: "128", NODE_PUBLIC_IP_PROVIDER: "mockip", NODE_PUBLIC_IP_CHECK_INTERVAL: "10s", @@ -345,10 +335,6 @@ func (env *Config) generateRetrieverVars(ind int, key string, graphUrl, logPath, RETRIEVER_CACHE_ENCODED_BLOBS: "false", RETRIEVER_INDEXER_PULL_INTERVAL: "1s", - - RETRIEVER_STD_LOG_LEVEL: "debug", - RETRIEVER_FILE_LOG_LEVEL: "debug", - RETRIEVER_LOG_PATH: logPath, } v.RETRIEVER_G2_PATH = "" diff --git a/inabox/deploy/env_vars.go b/inabox/deploy/env_vars.go index a1156f498a..e93c055e8d 100644 --- a/inabox/deploy/env_vars.go +++ b/inabox/deploy/env_vars.go @@ -29,12 +29,12 @@ type DisperserVars struct { DISPERSER_SERVER_NUM_CONFIRMATIONS string - DISPERSER_SERVER_STD_LOG_LEVEL string - - DISPERSER_SERVER_FILE_LOG_LEVEL string + DISPERSER_SERVER_LOG_LEVEL string DISPERSER_SERVER_LOG_PATH string + DISPERSER_SERVER_LOG_FORMAT string + DISPERSER_SERVER_BUCKET_SIZES string DISPERSER_SERVER_BUCKET_MULTIPLIERS string @@ -43,8 +43,6 @@ type DisperserVars struct { DISPERSER_SERVER_BUCKET_STORE_SIZE string - DISPERSER_SERVER_ALLOWLIST string - DISPERSER_SERVER_AWS_REGION string DISPERSER_SERVER_AWS_ACCESS_KEY_ID string @@ -64,6 +62,8 @@ type DisperserVars struct { DISPERSER_SERVER_PER_USER_UNAUTH_BLOB_RATE string DISPERSER_SERVER_CLIENT_IP_HEADER string + + DISPERSER_SERVER_ALLOWLIST string } func (vars DisperserVars) getEnvMap() map[string]string { @@ -98,8 +98,6 @@ type BatcherVars struct { BATCHER_SRS_ORDER string - BATCHER_SRS_LOAD string - BATCHER_METRICS_HTTP_PORT string BATCHER_INDEXER_DATA_DIR string @@ -116,22 +114,30 @@ type BatcherVars struct { BATCHER_FINALIZER_INTERVAL string + BATCHER_FINALIZER_POOL_SIZE string + BATCHER_ENCODING_REQUEST_QUEUE_SIZE string BATCHER_MAX_NUM_RETRIES_PER_BLOB string + BATCHER_TARGET_NUM_CHUNKS string + + BATCHER_MAX_BLOBS_TO_FETCH_FROM_STORE string + + BATCHER_FINALIZATION_BLOCK_DELAY string + BATCHER_CHAIN_RPC string BATCHER_PRIVATE_KEY string BATCHER_NUM_CONFIRMATIONS string - BATCHER_STD_LOG_LEVEL string - - BATCHER_FILE_LOG_LEVEL string + BATCHER_LOG_LEVEL string BATCHER_LOG_PATH string + BATCHER_LOG_FORMAT string + BATCHER_INDEXER_PULL_INTERVAL string BATCHER_AWS_REGION string @@ -141,8 +147,6 @@ type BatcherVars struct { BATCHER_AWS_SECRET_ACCESS_KEY string BATCHER_AWS_ENDPOINT_URL string - - BATCHER_MAX_BLOBS_TO_FETCH_FROM_STORE string } func (vars BatcherVars) getEnvMap() map[string]string { @@ -183,11 +187,13 @@ type EncoderVars struct { DISPERSER_ENCODER_PRELOAD_ENCODER string - DISPERSER_ENCODER_STD_LOG_LEVEL string + DISPERSER_ENCODER_G2_POWER_OF_2_PATH string - DISPERSER_ENCODER_FILE_LOG_LEVEL string + DISPERSER_ENCODER_LOG_LEVEL string DISPERSER_ENCODER_LOG_PATH string + + DISPERSER_ENCODER_LOG_FORMAT string } func (vars EncoderVars) getEnvMap() map[string]string { @@ -264,8 +270,6 @@ type OperatorVars struct { NODE_G2_PATH string - NODE_G2_POWER_OF_2_PATH string - NODE_CACHE_PATH string NODE_SRS_ORDER string @@ -280,17 +284,19 @@ type OperatorVars struct { NODE_PRELOAD_ENCODER string + NODE_G2_POWER_OF_2_PATH string + NODE_CHAIN_RPC string NODE_PRIVATE_KEY string NODE_NUM_CONFIRMATIONS string - NODE_STD_LOG_LEVEL string - - NODE_FILE_LOG_LEVEL string + NODE_LOG_LEVEL string NODE_LOG_PATH string + + NODE_LOG_FORMAT string } func (vars OperatorVars) getEnvMap() map[string]string { @@ -327,8 +333,6 @@ type RetrieverVars struct { RETRIEVER_G2_PATH string - RETRIEVER_G2_POWER_OF_2_PATH string - RETRIEVER_CACHE_PATH string RETRIEVER_SRS_ORDER string @@ -343,18 +347,20 @@ type RetrieverVars struct { RETRIEVER_PRELOAD_ENCODER string + RETRIEVER_G2_POWER_OF_2_PATH string + RETRIEVER_CHAIN_RPC string RETRIEVER_PRIVATE_KEY string RETRIEVER_NUM_CONFIRMATIONS string - RETRIEVER_STD_LOG_LEVEL string - - RETRIEVER_FILE_LOG_LEVEL string + RETRIEVER_LOG_LEVEL string RETRIEVER_LOG_PATH string + RETRIEVER_LOG_FORMAT string + RETRIEVER_INDEXER_PULL_INTERVAL string } @@ -390,12 +396,12 @@ type ChurnerVars struct { CHURNER_NUM_CONFIRMATIONS string - CHURNER_STD_LOG_LEVEL string - - CHURNER_FILE_LOG_LEVEL string + CHURNER_LOG_LEVEL string CHURNER_LOG_PATH string + CHURNER_LOG_FORMAT string + CHURNER_INDEXER_PULL_INTERVAL string } diff --git a/inabox/templates/testconfig-anvil-docker.yaml b/inabox/templates/testconfig-anvil-docker.yaml index b5b4a691a6..6f15261aec 100644 --- a/inabox/templates/testconfig-anvil-docker.yaml +++ b/inabox/templates/testconfig-anvil-docker.yaml @@ -45,7 +45,6 @@ services: SRS_LOAD: 2900 CHALLENGE_ORDER: 3000 LOG_LEVEL: "debug" - LOG_FORMAT: "text" VERBOSE: true NUM_CONNECTIONS: 50 AWS_ENDPOINT_URL: http://host.docker.internal:4570 diff --git a/inabox/templates/testconfig-anvil.yaml b/inabox/templates/testconfig-anvil.yaml index aa1bf47877..e43ec64b47 100644 --- a/inabox/templates/testconfig-anvil.yaml +++ b/inabox/templates/testconfig-anvil.yaml @@ -45,7 +45,6 @@ services: SRS_LOAD: 2900 CHALLENGE_ORDER: 3000 LOG_LEVEL: "debug" - LOG_FORMAT: "text" VERBOSE: true NUM_CONNECTIONS: 50 AWS_ENDPOINT_URL: http://localhost:4570