Skip to content

Commit

Permalink
Merge pull request #2377 from OffchainLabs/rm_fetch_batch
Browse files Browse the repository at this point in the history
[NIT-2570] Avoids fetching batch in execution layer, consensus layer fills all necessary information regarding a batch to the execution layer
  • Loading branch information
tsahee authored Jul 29, 2024
2 parents 1b9f7ab + 65fc8ea commit 569ec66
Show file tree
Hide file tree
Showing 17 changed files with 125 additions and 105 deletions.
11 changes: 10 additions & 1 deletion arbnode/delayed_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type DelayedSequencer struct {
l1Reader *headerreader.HeaderReader
bridge *DelayedBridge
inbox *InboxTracker
reader *InboxReader
exec execution.ExecutionSequencer
coordinator *SeqCoordinator
waitingForFinalizedBlock uint64
Expand Down Expand Up @@ -68,6 +69,7 @@ func NewDelayedSequencer(l1Reader *headerreader.HeaderReader, reader *InboxReade
l1Reader: l1Reader,
bridge: reader.DelayedBridge(),
inbox: reader.Tracker(),
reader: reader,
coordinator: coordinator,
exec: exec,
config: config,
Expand Down Expand Up @@ -144,7 +146,7 @@ func (d *DelayedSequencer) sequenceWithoutLockout(ctx context.Context, lastBlock
var lastDelayedAcc common.Hash
var messages []*arbostypes.L1IncomingMessage
for pos < dbDelayedCount {
msg, acc, parentChainBlockNumber, err := d.inbox.GetDelayedMessageAccumulatorAndParentChainBlockNumber(pos)
msg, acc, parentChainBlockNumber, err := d.inbox.GetDelayedMessageAccumulatorAndParentChainBlockNumber(ctx, pos)
if err != nil {
return err
}
Expand All @@ -165,6 +167,13 @@ func (d *DelayedSequencer) sequenceWithoutLockout(ctx context.Context, lastBlock
}
}
lastDelayedAcc = acc
err = msg.FillInBatchGasCost(func(batchNum uint64) ([]byte, error) {
data, _, err := d.reader.GetSequencerMessageBytes(ctx, batchNum)
return data, err
})
if err != nil {
return err
}
messages = append(messages, msg)
pos++
}
Expand Down
12 changes: 8 additions & 4 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ func (r *InboxReader) Start(ctxIn context.Context) error {
break
}
// Validate the init message matches our L2 blockchain
message, err := r.tracker.GetDelayedMessage(0)
ctx, err := r.StopWaiter.GetContextSafe()
if err != nil {
return err
}
message, err := r.tracker.GetDelayedMessage(ctx, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -226,7 +230,7 @@ func (r *InboxReader) CaughtUp() chan struct{} {

func (r *InboxReader) run(ctx context.Context, hadError bool) error {
readMode := r.config().ReadMode
from, err := r.getNextBlockToRead()
from, err := r.getNextBlockToRead(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -584,15 +588,15 @@ func (r *InboxReader) getPrevBlockForReorg(from *big.Int) (*big.Int, error) {
return newFrom, nil
}

func (r *InboxReader) getNextBlockToRead() (*big.Int, error) {
func (r *InboxReader) getNextBlockToRead(ctx context.Context) (*big.Int, error) {
delayedCount, err := r.tracker.GetDelayedCount()
if err != nil {
return nil, err
}
if delayedCount == 0 {
return new(big.Int).Set(r.firstMessageBlock), nil
}
_, _, parentChainBlockNumber, err := r.tracker.GetDelayedMessageAccumulatorAndParentChainBlockNumber(delayedCount - 1)
_, _, parentChainBlockNumber, err := r.tracker.GetDelayedMessageAccumulatorAndParentChainBlockNumber(ctx, delayedCount-1)
if err != nil {
return nil, err
}
Expand Down
33 changes: 25 additions & 8 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcas
return nil
}

func (t *InboxTracker) legacyGetDelayedMessageAndAccumulator(seqNum uint64) (*arbostypes.L1IncomingMessage, common.Hash, error) {
func (t *InboxTracker) legacyGetDelayedMessageAndAccumulator(ctx context.Context, seqNum uint64) (*arbostypes.L1IncomingMessage, common.Hash, error) {
key := dbKey(legacyDelayedMessagePrefix, seqNum)
data, err := t.db.Get(key)
if err != nil {
Expand All @@ -328,17 +328,26 @@ func (t *InboxTracker) legacyGetDelayedMessageAndAccumulator(seqNum uint64) (*ar
var acc common.Hash
copy(acc[:], data[:32])
msg, err := arbostypes.ParseIncomingL1Message(bytes.NewReader(data[32:]), nil)
if err != nil {
return nil, common.Hash{}, err
}

err = msg.FillInBatchGasCost(func(batchNum uint64) ([]byte, error) {
data, _, err := t.txStreamer.inboxReader.GetSequencerMessageBytes(ctx, batchNum)
return data, err
})

return msg, acc, err
}

func (t *InboxTracker) GetDelayedMessageAccumulatorAndParentChainBlockNumber(seqNum uint64) (*arbostypes.L1IncomingMessage, common.Hash, uint64, error) {
func (t *InboxTracker) GetDelayedMessageAccumulatorAndParentChainBlockNumber(ctx context.Context, seqNum uint64) (*arbostypes.L1IncomingMessage, common.Hash, uint64, error) {
delayedMessageKey := dbKey(rlpDelayedMessagePrefix, seqNum)
exists, err := t.db.Has(delayedMessageKey)
if err != nil {
return nil, common.Hash{}, 0, err
}
if !exists {
msg, acc, err := t.legacyGetDelayedMessageAndAccumulator(seqNum)
msg, acc, err := t.legacyGetDelayedMessageAndAccumulator(ctx, seqNum)
return msg, acc, 0, err
}
data, err := t.db.Get(delayedMessageKey)
Expand All @@ -356,6 +365,14 @@ func (t *InboxTracker) GetDelayedMessageAccumulatorAndParentChainBlockNumber(seq
return msg, acc, 0, err
}

err = msg.FillInBatchGasCost(func(batchNum uint64) ([]byte, error) {
data, _, err := t.txStreamer.inboxReader.GetSequencerMessageBytes(ctx, batchNum)
return data, err
})
if err != nil {
return msg, acc, 0, err
}

parentChainBlockNumberKey := dbKey(parentChainBlockNumberPrefix, seqNum)
exists, err = t.db.Has(parentChainBlockNumberKey)
if err != nil {
Expand All @@ -373,13 +390,13 @@ func (t *InboxTracker) GetDelayedMessageAccumulatorAndParentChainBlockNumber(seq

}

func (t *InboxTracker) GetDelayedMessage(seqNum uint64) (*arbostypes.L1IncomingMessage, error) {
msg, _, _, err := t.GetDelayedMessageAccumulatorAndParentChainBlockNumber(seqNum)
func (t *InboxTracker) GetDelayedMessage(ctx context.Context, seqNum uint64) (*arbostypes.L1IncomingMessage, error) {
msg, _, _, err := t.GetDelayedMessageAccumulatorAndParentChainBlockNumber(ctx, seqNum)
return msg, err
}

func (t *InboxTracker) GetDelayedMessageBytes(seqNum uint64) ([]byte, error) {
msg, err := t.GetDelayedMessage(seqNum)
func (t *InboxTracker) GetDelayedMessageBytes(ctx context.Context, seqNum uint64) ([]byte, error) {
msg, err := t.GetDelayedMessage(ctx, seqNum)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -617,7 +634,7 @@ func (b *multiplexerBackend) ReadDelayedInbox(seqNum uint64) (*arbostypes.L1Inco
if len(b.batches) == 0 || seqNum >= b.batches[0].AfterDelayedCount {
return nil, errors.New("attempted to read past end of sequencer batch delayed messages")
}
return b.inbox.GetDelayedMessage(seqNum)
return b.inbox.GetDelayedMessage(b.ctx, seqNum)
}

var delayedMessagesMismatch = errors.New("sequencer batch delayed messages missing or different")
Expand Down
4 changes: 0 additions & 4 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,10 +990,6 @@ func (n *Node) StopAndWait() {
}
}

func (n *Node) FetchBatch(ctx context.Context, batchNum uint64) ([]byte, common.Hash, error) {
return n.InboxReader.GetSequencerMessageBytes(ctx, batchNum)
}

func (n *Node) FindInboxBatchContainingMessage(message arbutil.MessageIndex) (uint64, bool, error) {
return n.InboxTracker.FindInboxBatchContainingMessage(message)
}
Expand Down
12 changes: 12 additions & 0 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,18 @@ func (s *TransactionStreamer) GetMessage(seqNum arbutil.MessageIndex) (*arbostyp
return nil, err
}

err = message.Message.FillInBatchGasCost(func(batchNum uint64) ([]byte, error) {
ctx, err := s.GetContextSafe()
if err != nil {
return nil, err
}
data, _, err := s.inboxReader.GetSequencerMessageBytes(ctx, batchNum)
return data, err
})
if err != nil {
return nil, err
}

return &message, nil
}

Expand Down
20 changes: 1 addition & 19 deletions arbos/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
Expand Down Expand Up @@ -144,26 +143,9 @@ func ProduceBlock(
statedb *state.StateDB,
chainContext core.ChainContext,
chainConfig *params.ChainConfig,
batchFetcher arbostypes.FallibleBatchFetcher,
isMsgForPrefetch bool,
) (*types.Block, types.Receipts, error) {
var batchFetchErr error
txes, err := ParseL2Transactions(message, chainConfig.ChainID, func(batchNum uint64, batchHash common.Hash) []byte {
data, err := batchFetcher(batchNum)
if err != nil {
batchFetchErr = err
return nil
}
dataHash := crypto.Keccak256Hash(data)
if dataHash != batchHash {
batchFetchErr = fmt.Errorf("expecting batch %v hash %v but got data with hash %v", batchNum, batchHash, dataHash)
return nil
}
return data
})
if batchFetchErr != nil {
return nil, nil, batchFetchErr
}
txes, err := ParseL2Transactions(message, chainConfig.ChainID)
if err != nil {
log.Warn("error parsing incoming message", "err", err)
txes = types.Transactions{}
Expand Down
2 changes: 1 addition & 1 deletion arbos/incomingmessage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestSerializeAndParseL1Message(t *testing.T) {
if err != nil {
t.Error(err)
}
txes, err := ParseL2Transactions(newMsg, chainId, nil)
txes, err := ParseL2Transactions(newMsg, chainId)
if err != nil {
t.Error(err)
}
Expand Down
13 changes: 5 additions & 8 deletions arbos/parse_l2.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ import (
"github.com/offchainlabs/nitro/util/arbmath"
)

type InfallibleBatchFetcher func(batchNum uint64, batchHash common.Hash) []byte

func ParseL2Transactions(msg *arbostypes.L1IncomingMessage, chainId *big.Int, batchFetcher InfallibleBatchFetcher) (types.Transactions, error) {
func ParseL2Transactions(msg *arbostypes.L1IncomingMessage, chainId *big.Int) (types.Transactions, error) {
if len(msg.L2msg) > arbostypes.MaxL2MessageSize {
// ignore the message if l2msg is too large
return nil, errors.New("message too large")
Expand Down Expand Up @@ -71,7 +69,7 @@ func ParseL2Transactions(msg *arbostypes.L1IncomingMessage, chainId *big.Int, ba
log.Debug("ignoring rollup event message")
return types.Transactions{}, nil
case arbostypes.L1MessageType_BatchPostingReport:
tx, err := parseBatchPostingReportMessage(bytes.NewReader(msg.L2msg), chainId, msg.BatchGasCost, batchFetcher)
tx, err := parseBatchPostingReportMessage(bytes.NewReader(msg.L2msg), chainId, msg.BatchGasCost)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -370,17 +368,16 @@ func parseSubmitRetryableMessage(rd io.Reader, header *arbostypes.L1IncomingMess
return types.NewTx(tx), err
}

func parseBatchPostingReportMessage(rd io.Reader, chainId *big.Int, msgBatchGasCost *uint64, batchFetcher InfallibleBatchFetcher) (*types.Transaction, error) {
batchTimestamp, batchPosterAddr, batchHash, batchNum, l1BaseFee, extraGas, err := arbostypes.ParseBatchPostingReportMessageFields(rd)
func parseBatchPostingReportMessage(rd io.Reader, chainId *big.Int, msgBatchGasCost *uint64) (*types.Transaction, error) {
batchTimestamp, batchPosterAddr, _, batchNum, l1BaseFee, extraGas, err := arbostypes.ParseBatchPostingReportMessageFields(rd)
if err != nil {
return nil, err
}
var batchDataGas uint64
if msgBatchGasCost != nil {
batchDataGas = *msgBatchGasCost
} else {
batchData := batchFetcher(batchNum, batchHash)
batchDataGas = arbostypes.ComputeBatchGasCost(batchData)
return nil, errors.New("cannot compute batch gas cost")
}
batchDataGas = arbmath.SaturatingUAdd(batchDataGas, extraGas)

Expand Down
17 changes: 12 additions & 5 deletions cmd/replay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ func main() {
panic(fmt.Sprintf("Error opening state db: %v", err.Error()))
}

batchFetcher := func(batchNum uint64) ([]byte, error) {
currentBatch := wavmio.GetInboxPosition()
if batchNum > currentBatch {
return nil, fmt.Errorf("invalid batch fetch request %d, max %d", batchNum, currentBatch)
}
return wavmio.ReadInboxMessage(batchNum), nil
}
readMessage := func(dasEnabled bool) *arbostypes.MessageWithMetadata {
var delayedMessagesRead uint64
if lastBlockHeader != nil {
Expand Down Expand Up @@ -232,6 +239,10 @@ func main() {
panic(fmt.Sprintf("Error reading from inbox multiplexer: %v", err.Error()))
}

err = message.Message.FillInBatchGasCost(batchFetcher)
if err != nil {
message.Message = arbostypes.InvalidL1Message
}
return message
}

Expand Down Expand Up @@ -280,14 +291,10 @@ func main() {
message := readMessage(chainConfig.ArbitrumChainParams.DataAvailabilityCommittee)

chainContext := WavmChainContext{}
batchFetcher := func(batchNum uint64) ([]byte, error) {
return wavmio.ReadInboxMessage(batchNum), nil
}
newBlock, _, err = arbos.ProduceBlock(message.Message, message.DelayedMessagesRead, lastBlockHeader, statedb, chainContext, chainConfig, batchFetcher, false)
newBlock, _, err = arbos.ProduceBlock(message.Message, message.DelayedMessagesRead, lastBlockHeader, statedb, chainContext, chainConfig, false)
if err != nil {
panic(err)
}

} else {
// Initialize ArbOS with this init message and create the genesis block.

Expand Down
19 changes: 0 additions & 19 deletions execution/gethexec/block_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/validator"
)

// BlockRecorder uses a separate statedatabase from the blockchain.
Expand Down Expand Up @@ -120,31 +119,14 @@ func (r *BlockRecorder) RecordBlockCreation(
}

var blockHash common.Hash
var readBatchInfo []validator.BatchInfo
if msg != nil {
batchFetcher := func(batchNum uint64) ([]byte, error) {
data, blockHash, err := r.execEngine.consensus.FetchBatch(ctx, batchNum)
if err != nil {
return nil, err
}
readBatchInfo = append(readBatchInfo, validator.BatchInfo{
Number: batchNum,
BlockHash: blockHash,
Data: data,
})
return data, nil
}
// Re-fetch the batch instead of using our cached cost,
// as the replay binary won't have the cache populated.
msg.Message.BatchGasCost = nil
block, _, err := arbos.ProduceBlock(
msg.Message,
msg.DelayedMessagesRead,
prevHeader,
recordingdb,
chaincontext,
chainConfig,
batchFetcher,
false,
)
if err != nil {
Expand Down Expand Up @@ -172,7 +154,6 @@ func (r *BlockRecorder) RecordBlockCreation(
Pos: pos,
BlockHash: blockHash,
Preimages: preimages,
BatchInfo: readBatchInfo,
UserWasms: recordingdb.UserWasms(),
}, err
}
Expand Down
9 changes: 1 addition & 8 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,7 @@ func (s *ExecutionEngine) resequenceReorgedMessages(messages []*arbostypes.Messa
log.Warn("skipping non-standard sequencer message found from reorg", "header", header)
continue
}
// We don't need a batch fetcher as this is an L2 message
txes, err := arbos.ParseL2Transactions(msg.Message, s.bc.Config().ChainID, nil)
txes, err := arbos.ParseL2Transactions(msg.Message, s.bc.Config().ChainID)
if err != nil {
log.Warn("failed to parse sequencer message found from reorg", "err", err)
continue
Expand Down Expand Up @@ -625,19 +624,13 @@ func (s *ExecutionEngine) createBlockFromNextMessage(msg *arbostypes.MessageWith
statedb.StartPrefetcher("TransactionStreamer")
defer statedb.StopPrefetcher()

batchFetcher := func(num uint64) ([]byte, error) {
data, _, err := s.consensus.FetchBatch(s.GetContext(), num)
return data, err
}

block, receipts, err := arbos.ProduceBlock(
msg.Message,
msg.DelayedMessagesRead,
currentHeader,
statedb,
s.bc,
s.bc.Config(),
batchFetcher,
isMsgForPrefetch,
)

Expand Down
Loading

0 comments on commit 569ec66

Please sign in to comment.