Skip to content

Commit

Permalink
op-node: calldata source - reset pipeline on err (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
tuxcanfly committed Nov 28, 2023
1 parent 76ea4d3 commit 998cc21
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
22 changes: 11 additions & 11 deletions op-node/rollup/derive/calldata_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewDataSourceFactory(log log.Logger, cfg *rollup.Config, daCfg *rollup.DACo
}

// OpenData returns a DataIter. This struct implements the `Next` function.
func (ds *DataSourceFactory) OpenData(ctx context.Context, id eth.BlockID, batcherAddr common.Address) DataIter {
func (ds *DataSourceFactory) OpenData(ctx context.Context, id eth.BlockID, batcherAddr common.Address) (DataIter, error) {
return NewDataSource(ctx, ds.log, ds.cfg, ds.daCfg, ds.fetcher, id, batcherAddr)
}

Expand All @@ -64,7 +64,7 @@ type DataSource struct {

// NewDataSource creates a new calldata source. It suppresses errors in fetching the L1 block if they occur.
// If there is an error, it will attempt to fetch the result on the next call to `Next`.
func NewDataSource(ctx context.Context, log log.Logger, cfg *rollup.Config, daCfg *rollup.DAConfig, fetcher L1TransactionFetcher, block eth.BlockID, batcherAddr common.Address) DataIter {
func NewDataSource(ctx context.Context, log log.Logger, cfg *rollup.Config, daCfg *rollup.DAConfig, fetcher L1TransactionFetcher, block eth.BlockID, batcherAddr common.Address) (DataIter, error) {
_, txs, err := fetcher.InfoAndTxsByHash(ctx, block.Hash)
if err != nil {
return &DataSource{
Expand All @@ -74,7 +74,7 @@ func NewDataSource(ctx context.Context, log log.Logger, cfg *rollup.Config, daCf
fetcher: fetcher,
log: log,
batcherAddr: batcherAddr,
}
}, nil
} else {
data, err := DataFromEVMTransactions(cfg, daCfg, batcherAddr, txs, log.New("origin", block))
if err != nil {
Expand All @@ -85,12 +85,12 @@ func NewDataSource(ctx context.Context, log log.Logger, cfg *rollup.Config, daCf
fetcher: fetcher,
log: log,
batcherAddr: batcherAddr,
}
}, err
}
return &DataSource{
open: true,
data: data,
}
}, nil
}
}

Expand All @@ -103,7 +103,8 @@ func (ds *DataSource) Next(ctx context.Context) (eth.Data, error) {
ds.open = true
ds.data, err = DataFromEVMTransactions(ds.cfg, ds.daCfg, ds.batcherAddr, txs, log.New("origin", ds.id))
if err != nil {
return nil, NewTemporaryError(fmt.Errorf("failed to open calldata source: %w", err))
// already wrapped
return nil, err
}
} else if errors.Is(err, ethereum.NotFound) {
return nil, NewResetError(fmt.Errorf("failed to open calldata source: %w", err))
Expand Down Expand Up @@ -143,15 +144,14 @@ func DataFromEVMTransactions(config *rollup.Config, daCfg *rollup.DAConfig, batc
height, index, err := decodeETHData(tx.Data())
if err != nil {
log.Warn("unable to decode data pointer", "index", j, "err", err)
continue
return nil, err
}
log.Warn("requesting celestia namespaced data", "namespace", hex.EncodeToString(daCfg.Namespace.Bytes()), "height", height)
log.Info("requesting data from celestia", "namespace", hex.EncodeToString(daCfg.Namespace.Bytes()), "height", height)
data, err := daCfg.Client.NamespacedData(context.Background(), daCfg.Namespace, uint64(height))
if err != nil {
log.Warn("unable to retrieve data from da", "err", err)
continue
return nil, NewResetError(fmt.Errorf("failed to retrieve data from celestia: %w", err))
}
log.Warn("retrieved data", "data", hex.EncodeToString(data[index]))
log.Debug("retrieved data", "data", hex.EncodeToString(data[index]))
out = append(out, data[index])
} else {
out = append(out, tx.Data())
Expand Down
9 changes: 6 additions & 3 deletions op-node/rollup/derive/l1_retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type DataAvailabilitySource interface {
OpenData(ctx context.Context, id eth.BlockID, batcherAddr common.Address) DataIter
OpenData(ctx context.Context, id eth.BlockID, batcherAddr common.Address) (DataIter, error)
}

type NextBlockProvider interface {
Expand Down Expand Up @@ -53,7 +53,10 @@ func (l1r *L1Retrieval) NextData(ctx context.Context) ([]byte, error) {
} else if err != nil {
return nil, err
}
l1r.datas = l1r.dataSrc.OpenData(ctx, next.ID(), l1r.prev.SystemConfig().BatcherAddr)
l1r.datas, err = l1r.dataSrc.OpenData(ctx, next.ID(), l1r.prev.SystemConfig().BatcherAddr)
if err != nil {
return nil, err
}
}

l1r.log.Debug("fetching next piece of data")
Expand All @@ -73,7 +76,7 @@ func (l1r *L1Retrieval) NextData(ctx context.Context) ([]byte, error) {
// Note that we open up the `l1r.datas` here because it is requires to maintain the
// internal invariants that later propagate up the derivation pipeline.
func (l1r *L1Retrieval) Reset(ctx context.Context, base eth.L1BlockRef, sysCfg eth.SystemConfig) error {
l1r.datas = l1r.dataSrc.OpenData(ctx, base.ID(), sysCfg.BatcherAddr)
l1r.datas, _ = l1r.dataSrc.OpenData(ctx, base.ID(), sysCfg.BatcherAddr)
l1r.log.Info("Reset of L1Retrieval done", "origin", base)
return io.EOF
}

0 comments on commit 998cc21

Please sign in to comment.