diff --git a/op-node/rollup/derive/calldata_source.go b/op-node/rollup/derive/calldata_source.go index d6bc0277e115..2c9808e7fd22 100644 --- a/op-node/rollup/derive/calldata_source.go +++ b/op-node/rollup/derive/calldata_source.go @@ -41,7 +41,7 @@ func NewDataSourceFactory(log log.Logger, cfg *rollup.Config, daCfg *rollup.DACo } // OpenData returns a DataIter. This struct implements the `Next` function. -func (ds *DataSourceFactory) OpenData(ctx context.Context, id eth.BlockID, batcherAddr common.Address) DataIter { +func (ds *DataSourceFactory) OpenData(ctx context.Context, id eth.BlockID, batcherAddr common.Address) (DataIter, error) { return NewDataSource(ctx, ds.log, ds.cfg, ds.daCfg, ds.fetcher, id, batcherAddr) } @@ -64,7 +64,7 @@ type DataSource struct { // NewDataSource creates a new calldata source. It suppresses errors in fetching the L1 block if they occur. // If there is an error, it will attempt to fetch the result on the next call to `Next`. -func NewDataSource(ctx context.Context, log log.Logger, cfg *rollup.Config, daCfg *rollup.DAConfig, fetcher L1TransactionFetcher, block eth.BlockID, batcherAddr common.Address) DataIter { +func NewDataSource(ctx context.Context, log log.Logger, cfg *rollup.Config, daCfg *rollup.DAConfig, fetcher L1TransactionFetcher, block eth.BlockID, batcherAddr common.Address) (DataIter, error) { _, txs, err := fetcher.InfoAndTxsByHash(ctx, block.Hash) if err != nil { return &DataSource{ @@ -74,7 +74,7 @@ func NewDataSource(ctx context.Context, log log.Logger, cfg *rollup.Config, daCf fetcher: fetcher, log: log, batcherAddr: batcherAddr, - } + }, nil } else { data, err := DataFromEVMTransactions(cfg, daCfg, batcherAddr, txs, log.New("origin", block)) if err != nil { @@ -85,12 +85,12 @@ func NewDataSource(ctx context.Context, log log.Logger, cfg *rollup.Config, daCf fetcher: fetcher, log: log, batcherAddr: batcherAddr, - } + }, err } return &DataSource{ open: true, data: data, - } + }, nil } } @@ -103,7 +103,8 @@ func (ds *DataSource) Next(ctx context.Context) (eth.Data, error) { ds.open = true ds.data, err = DataFromEVMTransactions(ds.cfg, ds.daCfg, ds.batcherAddr, txs, log.New("origin", ds.id)) if err != nil { - return nil, NewTemporaryError(fmt.Errorf("failed to open calldata source: %w", err)) + // already wrapped + return nil, err } } else if errors.Is(err, ethereum.NotFound) { return nil, NewResetError(fmt.Errorf("failed to open calldata source: %w", err)) @@ -143,15 +144,14 @@ func DataFromEVMTransactions(config *rollup.Config, daCfg *rollup.DAConfig, batc height, index, err := decodeETHData(tx.Data()) if err != nil { log.Warn("unable to decode data pointer", "index", j, "err", err) - continue + return nil, err } - log.Warn("requesting celestia namespaced data", "namespace", hex.EncodeToString(daCfg.Namespace.Bytes()), "height", height) + log.Info("requesting data from celestia", "namespace", hex.EncodeToString(daCfg.Namespace.Bytes()), "height", height) data, err := daCfg.Client.NamespacedData(context.Background(), daCfg.Namespace, uint64(height)) if err != nil { - log.Warn("unable to retrieve data from da", "err", err) - continue + return nil, NewResetError(fmt.Errorf("failed to retrieve data from celestia: %w", err)) } - log.Warn("retrieved data", "data", hex.EncodeToString(data[index])) + log.Debug("retrieved data", "data", hex.EncodeToString(data[index])) out = append(out, data[index]) } else { out = append(out, tx.Data()) diff --git a/op-node/rollup/derive/l1_retrieval.go b/op-node/rollup/derive/l1_retrieval.go index 586e125acc32..db53ab2fff23 100644 --- a/op-node/rollup/derive/l1_retrieval.go +++ b/op-node/rollup/derive/l1_retrieval.go @@ -11,7 +11,7 @@ import ( ) type DataAvailabilitySource interface { - OpenData(ctx context.Context, id eth.BlockID, batcherAddr common.Address) DataIter + OpenData(ctx context.Context, id eth.BlockID, batcherAddr common.Address) (DataIter, error) } type NextBlockProvider interface { @@ -53,7 +53,10 @@ func (l1r *L1Retrieval) NextData(ctx context.Context) ([]byte, error) { } else if err != nil { return nil, err } - l1r.datas = l1r.dataSrc.OpenData(ctx, next.ID(), l1r.prev.SystemConfig().BatcherAddr) + l1r.datas, err = l1r.dataSrc.OpenData(ctx, next.ID(), l1r.prev.SystemConfig().BatcherAddr) + if err != nil { + return nil, err + } } l1r.log.Debug("fetching next piece of data") @@ -73,7 +76,7 @@ func (l1r *L1Retrieval) NextData(ctx context.Context) ([]byte, error) { // Note that we open up the `l1r.datas` here because it is requires to maintain the // internal invariants that later propagate up the derivation pipeline. func (l1r *L1Retrieval) Reset(ctx context.Context, base eth.L1BlockRef, sysCfg eth.SystemConfig) error { - l1r.datas = l1r.dataSrc.OpenData(ctx, base.ID(), sysCfg.BatcherAddr) + l1r.datas, _ = l1r.dataSrc.OpenData(ctx, base.ID(), sysCfg.BatcherAddr) l1r.log.Info("Reset of L1Retrieval done", "origin", base) return io.EOF }