Skip to content

Commit

Permalink
Use finalized reference block (#350)
Browse files Browse the repository at this point in the history
  • Loading branch information
mooselumph authored Mar 20, 2024
1 parent 6efe3b8 commit 30b6bb4
Show file tree
Hide file tree
Showing 11 changed files with 74 additions and 52 deletions.
12 changes: 11 additions & 1 deletion contracts/src/core/EigenDAServiceManagerStorage.sol
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,18 @@ abstract contract EigenDAServiceManagerStorage is IEigenDAServiceManager {
* @dev BLOCK_STALE_MEASURE should be greater than the number of blocks till finalization, but not too much greater, as it is the amount of
* time that nodes can be active after they have deregistered. The larger it is, the farther back stakes can be used, but the longer operators
* have to serve after they've deregistered.
*
* Note that this parameter needs to accommodate the delays which are introduced by the disperser, which are of two types:
* - FinalizationBlockDelay: when initializing a batch, the disperser will use a ReferenceBlockNumber which is this many
* blocks behind the current block number. This is to ensure that the the operator state associated with the reference block
* will be stable.
* - BatchInterval: the batch itself will only be confirmed after the batch interval has passed.
*
* Currently, we use a FinalizationBlockDelay of 75 blocks and a BatchInterval of 50 blocks,
* So using a BLOCK_STALE_MEASURE of 300 should be sufficient to ensure that the batch is not
* stale when it is confirmed.
*/
uint32 public constant BLOCK_STALE_MEASURE = 150;
uint32 public constant BLOCK_STALE_MEASURE = 300;

/**
* @notice The quorum adversary threshold percentages stored as an ordered bytes array
Expand Down
3 changes: 3 additions & 0 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type Config struct {
BatchSizeMBLimit uint
MaxNumRetriesPerBlob uint

FinalizationBlockDelay uint

TargetNumChunks uint
MaxBlobsToFetchFromStore int
}
Expand Down Expand Up @@ -110,6 +112,7 @@ func NewBatcher(
EncodingQueueLimit: config.EncodingRequestQueueSize,
TargetNumChunks: config.TargetNumChunks,
MaxBlobsToFetchFromStore: config.MaxBlobsToFetchFromStore,
FinalizationBlockDelay: config.FinalizationBlockDelay,
}
encodingWorkerPool := workerpool.New(config.NumConnections)
encodingStreamer, err := NewEncodingStreamer(streamerConfig, queue, chainState, encoderClient, assignmentCoordinator, batchTrigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger)
Expand Down
5 changes: 4 additions & 1 deletion disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.
// Common Components
logger := logging.NewNoopLogger()

finalizationBlockDelay := uint(75)

// Core Components
cst, err := coremock.MakeChainDataMock(10)
assert.NoError(t, err)
cst.On("GetCurrentBlockNumber").Return(uint(10), nil)
cst.On("GetCurrentBlockNumber").Return(uint(10)+finalizationBlockDelay, nil)
asgn := &core.StdAssignmentCoordinator{}
transactor := &coremock.MockTransactor{}
transactor.On("OperatorIDToAddress").Return(gethcommon.Address{}, nil)
Expand All @@ -99,6 +101,7 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.
BatchSizeMBLimit: 100,
SRSOrder: 3000,
MaxNumRetriesPerBlob: 2,
FinalizationBlockDelay: finalizationBlockDelay,
}
timeoutConfig := bat.TimeoutConfig{
EncodingTimeout: 10 * time.Second,
Expand Down
6 changes: 6 additions & 0 deletions disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type StreamerConfig struct {

// Maximum number of Blobs to fetch from store
MaxBlobsToFetchFromStore int

FinalizationBlockDelay uint
}

type EncodingStreamer struct {
Expand Down Expand Up @@ -208,6 +210,10 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan
if err != nil {
return fmt.Errorf("failed to get current block number, won't request encoding: %w", err)
} else {
if blockNumber > e.FinalizationBlockDelay {
blockNumber -= e.FinalizationBlockDelay
}

e.mu.Lock()
e.ReferenceBlockNumber = blockNumber
e.mu.Unlock()
Expand Down
13 changes: 7 additions & 6 deletions disperser/batcher/encoding_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
MaxBlobsToFetchFromStore: 10,
FinalizationBlockDelay: 75,
}
)

Expand Down Expand Up @@ -206,7 +207,7 @@ func TestStreamingEncoding(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, disperser.Processing, metadata.BlobStatus)

c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10), nil)
c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10)+encodingStreamer.FinalizationBlockDelay, nil)

out := make(chan batcher.EncodingResultOrStatus)
err = encodingStreamer.RequestEncoding(context.Background(), out)
Expand Down Expand Up @@ -322,7 +323,7 @@ func TestEncodingFailure(t *testing.T) {
metadataKey, err := blobStore.StoreBlob(ctx, &blob, uint64(time.Now().UnixNano()))
assert.Nil(t, err)

cst.On("GetCurrentBlockNumber").Return(uint(10), nil)
cst.On("GetCurrentBlockNumber").Return(uint(10)+encodingStreamer.FinalizationBlockDelay, nil)
encoderClient.On("EncodeBlob", tmock.Anything, tmock.Anything, tmock.Anything).Return(nil, nil, errors.New("errrrr"))
// request encoding
out := make(chan batcher.EncodingResultOrStatus)
Expand Down Expand Up @@ -350,7 +351,7 @@ func TestEncodingFailure(t *testing.T) {
func TestPartialBlob(t *testing.T) {
encodingStreamer, c := createEncodingStreamer(t, 10, 1e12, streamerConfig)

c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10), nil)
c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10)+encodingStreamer.FinalizationBlockDelay, nil)

out := make(chan batcher.EncodingResultOrStatus)

Expand Down Expand Up @@ -498,7 +499,7 @@ func TestIncorrectParameters(t *testing.T) {
metadataKey, err := c.blobStore.StoreBlob(ctx, &blob, uint64(time.Now().UnixNano()))
assert.Nil(t, err)

c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10), nil)
c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10)+encodingStreamer.FinalizationBlockDelay, nil)

// request encoding
out := make(chan batcher.EncodingResultOrStatus)
Expand All @@ -519,7 +520,7 @@ func TestIncorrectParameters(t *testing.T) {
func TestInvalidQuorum(t *testing.T) {
encodingStreamer, c := createEncodingStreamer(t, 10, 1e12, streamerConfig)

c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10), nil)
c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10)+encodingStreamer.FinalizationBlockDelay, nil)

out := make(chan batcher.EncodingResultOrStatus)

Expand Down Expand Up @@ -601,7 +602,7 @@ func TestGetBatch(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, disperser.Processing, metadata2.BlobStatus)

c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10), nil)
c.chainDataMock.On("GetCurrentBlockNumber").Return(uint(10)+encodingStreamer.FinalizationBlockDelay, nil)

// request encoding
out := make(chan batcher.EncodingResultOrStatus)
Expand Down
1 change: 1 addition & 0 deletions disperser/cmd/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name),
TargetNumChunks: ctx.GlobalUint(flags.TargetNumChunksFlag.Name),
MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name),
FinalizationBlockDelay: ctx.GlobalUint(flags.FinalizationBlockDelayFlag.Name),
},
TimeoutConfig: batcher.TimeoutConfig{
EncodingTimeout: ctx.GlobalDuration(flags.EncodingTimeoutFlag.Name),
Expand Down
8 changes: 8 additions & 0 deletions disperser/cmd/batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_BLOBS_TO_FETCH_FROM_STORE"),
Value: 100,
}
FinalizationBlockDelayFlag = cli.UintFlag{
Name: common.PrefixFlag(FlagPrefix, "finalization-block-delay"),
Usage: "The block delay to use for pulling operator state in order to ensure the state is finalized",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "FINALIZATION_BLOCK_DELAY"),
Value: 75,
}
)

var requiredFlags = []cli.Flag{
Expand Down Expand Up @@ -207,6 +214,7 @@ var optionalFlags = []cli.Flag{
MaxNumRetriesPerBlobFlag,
TargetNumChunksFlag,
MaxBlobsToFetchFromStoreFlag,
FinalizationBlockDelayFlag,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
16 changes: 1 addition & 15 deletions inabox/deploy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,6 @@ func (env *Config) generateChurnerVars(ind int, graphUrl, logPath, grpcPort stri
CHURNER_GRAPH_URL: graphUrl,
CHURNER_INDEXER_PULL_INTERVAL: "1s",

CHURNER_STD_LOG_LEVEL: "debug",
CHURNER_FILE_LOG_LEVEL: "debug",
CHURNER_LOG_PATH: logPath,

CHURNER_ENABLE_METRICS: "true",
CHURNER_METRICS_HTTP_PORT: "9095",
}
Expand Down Expand Up @@ -206,12 +202,8 @@ func (env *Config) generateBatcherVars(ind int, key, graphUrl, logPath string) B
BATCHER_BLS_OPERATOR_STATE_RETRIVER: env.EigenDA.OperatorStateRetreiver,
BATCHER_EIGENDA_SERVICE_MANAGER: env.EigenDA.ServiceManager,
BATCHER_SRS_ORDER: "300000",
BATCHER_SRS_LOAD: "300000",
BATCHER_CHAIN_RPC: "",
BATCHER_PRIVATE_KEY: key[2:],
BATCHER_STD_LOG_LEVEL: "debug",
BATCHER_FILE_LOG_LEVEL: "debug",
BATCHER_LOG_PATH: logPath,
BATCHER_GRAPH_URL: graphUrl,
BATCHER_USE_GRAPH: "true",
BATCHER_BATCH_SIZE_LIMIT: "10240", // 10 GiB
Expand All @@ -224,6 +216,7 @@ func (env *Config) generateBatcherVars(ind int, key, graphUrl, logPath string) B
BATCHER_ENCODING_REQUEST_QUEUE_SIZE: "500",
BATCHER_NUM_CONFIRMATIONS: "0",
BATCHER_MAX_BLOBS_TO_FETCH_FROM_STORE: "100",
BATCHER_FINALIZATION_BLOCK_DELAY: "5",
}

env.applyDefaults(&v, "BATCHER", "batcher", ind)
Expand Down Expand Up @@ -306,9 +299,6 @@ func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath,
NODE_VERBOSE: "true",
NODE_CHAIN_RPC: "",
NODE_PRIVATE_KEY: key[2:],
NODE_STD_LOG_LEVEL: "debug",
NODE_FILE_LOG_LEVEL: "debug",
NODE_LOG_PATH: logPath,
NODE_NUM_BATCH_VALIDATORS: "128",
NODE_PUBLIC_IP_PROVIDER: "mockip",
NODE_PUBLIC_IP_CHECK_INTERVAL: "10s",
Expand Down Expand Up @@ -345,10 +335,6 @@ func (env *Config) generateRetrieverVars(ind int, key string, graphUrl, logPath,
RETRIEVER_CACHE_ENCODED_BLOBS: "false",

RETRIEVER_INDEXER_PULL_INTERVAL: "1s",

RETRIEVER_STD_LOG_LEVEL: "debug",
RETRIEVER_FILE_LOG_LEVEL: "debug",
RETRIEVER_LOG_PATH: logPath,
}

v.RETRIEVER_G2_PATH = ""
Expand Down
Loading

0 comments on commit 30b6bb4

Please sign in to comment.