Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NIT-2570] Avoids fetching batch in execution layer, consensus layer fills all necessary information regarding a batch to the execution layer #2377

Merged
merged 8 commits into from
Jul 29, 2024
11 changes: 10 additions & 1 deletion arbnode/delayed_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type DelayedSequencer struct {
l1Reader *headerreader.HeaderReader
bridge *DelayedBridge
inbox *InboxTracker
reader *InboxReader
exec execution.ExecutionSequencer
coordinator *SeqCoordinator
waitingForFinalizedBlock uint64
Expand Down Expand Up @@ -68,6 +69,7 @@ func NewDelayedSequencer(l1Reader *headerreader.HeaderReader, reader *InboxReade
l1Reader: l1Reader,
bridge: reader.DelayedBridge(),
inbox: reader.Tracker(),
reader: reader,
coordinator: coordinator,
exec: exec,
config: config,
Expand Down Expand Up @@ -144,7 +146,7 @@ func (d *DelayedSequencer) sequenceWithoutLockout(ctx context.Context, lastBlock
var lastDelayedAcc common.Hash
var messages []*arbostypes.L1IncomingMessage
for pos < dbDelayedCount {
msg, acc, parentChainBlockNumber, err := d.inbox.GetDelayedMessageAccumulatorAndParentChainBlockNumber(pos)
msg, acc, parentChainBlockNumber, err := d.inbox.GetDelayedMessageAccumulatorAndParentChainBlockNumber(ctx, pos)
if err != nil {
return err
}
Expand All @@ -165,6 +167,13 @@ func (d *DelayedSequencer) sequenceWithoutLockout(ctx context.Context, lastBlock
}
}
lastDelayedAcc = acc
err = msg.FillInBatchGasCost(func(batchNum uint64) ([]byte, error) {
data, _, err := d.reader.GetSequencerMessageBytes(ctx, batchNum)
return data, err
})
if err != nil {
return err
}
messages = append(messages, msg)
pos++
}
Expand Down
12 changes: 8 additions & 4 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ func (r *InboxReader) Start(ctxIn context.Context) error {
break
}
// Validate the init message matches our L2 blockchain
message, err := r.tracker.GetDelayedMessage(0)
ctx, err := r.StopWaiter.GetContextSafe()
if err != nil {
return err
}
message, err := r.tracker.GetDelayedMessage(ctx, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -226,7 +230,7 @@ func (r *InboxReader) CaughtUp() chan struct{} {

func (r *InboxReader) run(ctx context.Context, hadError bool) error {
readMode := r.config().ReadMode
from, err := r.getNextBlockToRead()
from, err := r.getNextBlockToRead(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -584,15 +588,15 @@ func (r *InboxReader) getPrevBlockForReorg(from *big.Int) (*big.Int, error) {
return newFrom, nil
}

func (r *InboxReader) getNextBlockToRead() (*big.Int, error) {
func (r *InboxReader) getNextBlockToRead(ctx context.Context) (*big.Int, error) {
delayedCount, err := r.tracker.GetDelayedCount()
if err != nil {
return nil, err
}
if delayedCount == 0 {
return new(big.Int).Set(r.firstMessageBlock), nil
}
_, _, parentChainBlockNumber, err := r.tracker.GetDelayedMessageAccumulatorAndParentChainBlockNumber(delayedCount - 1)
_, _, parentChainBlockNumber, err := r.tracker.GetDelayedMessageAccumulatorAndParentChainBlockNumber(ctx, delayedCount-1)
if err != nil {
return nil, err
}
Expand Down
33 changes: 25 additions & 8 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcas
return nil
}

func (t *InboxTracker) legacyGetDelayedMessageAndAccumulator(seqNum uint64) (*arbostypes.L1IncomingMessage, common.Hash, error) {
func (t *InboxTracker) legacyGetDelayedMessageAndAccumulator(ctx context.Context, seqNum uint64) (*arbostypes.L1IncomingMessage, common.Hash, error) {
key := dbKey(legacyDelayedMessagePrefix, seqNum)
data, err := t.db.Get(key)
if err != nil {
Expand All @@ -328,17 +328,26 @@ func (t *InboxTracker) legacyGetDelayedMessageAndAccumulator(seqNum uint64) (*ar
var acc common.Hash
copy(acc[:], data[:32])
msg, err := arbostypes.ParseIncomingL1Message(bytes.NewReader(data[32:]), nil)
if err != nil {
return nil, common.Hash{}, err
}

err = msg.FillInBatchGasCost(func(batchNum uint64) ([]byte, error) {
data, _, err := t.txStreamer.inboxReader.GetSequencerMessageBytes(ctx, batchNum)
return data, err
})

return msg, acc, err
}

func (t *InboxTracker) GetDelayedMessageAccumulatorAndParentChainBlockNumber(seqNum uint64) (*arbostypes.L1IncomingMessage, common.Hash, uint64, error) {
func (t *InboxTracker) GetDelayedMessageAccumulatorAndParentChainBlockNumber(ctx context.Context, seqNum uint64) (*arbostypes.L1IncomingMessage, common.Hash, uint64, error) {
delayedMessageKey := dbKey(rlpDelayedMessagePrefix, seqNum)
exists, err := t.db.Has(delayedMessageKey)
if err != nil {
return nil, common.Hash{}, 0, err
}
if !exists {
msg, acc, err := t.legacyGetDelayedMessageAndAccumulator(seqNum)
msg, acc, err := t.legacyGetDelayedMessageAndAccumulator(ctx, seqNum)
return msg, acc, 0, err
}
data, err := t.db.Get(delayedMessageKey)
Expand All @@ -356,6 +365,14 @@ func (t *InboxTracker) GetDelayedMessageAccumulatorAndParentChainBlockNumber(seq
return msg, acc, 0, err
}

err = msg.FillInBatchGasCost(func(batchNum uint64) ([]byte, error) {
data, _, err := t.txStreamer.inboxReader.GetSequencerMessageBytes(ctx, batchNum)
return data, err
})
if err != nil {
return msg, acc, 0, err
}

parentChainBlockNumberKey := dbKey(parentChainBlockNumberPrefix, seqNum)
exists, err = t.db.Has(parentChainBlockNumberKey)
if err != nil {
Expand All @@ -373,13 +390,13 @@ func (t *InboxTracker) GetDelayedMessageAccumulatorAndParentChainBlockNumber(seq

}

func (t *InboxTracker) GetDelayedMessage(seqNum uint64) (*arbostypes.L1IncomingMessage, error) {
msg, _, _, err := t.GetDelayedMessageAccumulatorAndParentChainBlockNumber(seqNum)
func (t *InboxTracker) GetDelayedMessage(ctx context.Context, seqNum uint64) (*arbostypes.L1IncomingMessage, error) {
msg, _, _, err := t.GetDelayedMessageAccumulatorAndParentChainBlockNumber(ctx, seqNum)
return msg, err
}

func (t *InboxTracker) GetDelayedMessageBytes(seqNum uint64) ([]byte, error) {
msg, err := t.GetDelayedMessage(seqNum)
func (t *InboxTracker) GetDelayedMessageBytes(ctx context.Context, seqNum uint64) ([]byte, error) {
msg, err := t.GetDelayedMessage(ctx, seqNum)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -617,7 +634,7 @@ func (b *multiplexerBackend) ReadDelayedInbox(seqNum uint64) (*arbostypes.L1Inco
if len(b.batches) == 0 || seqNum >= b.batches[0].AfterDelayedCount {
return nil, errors.New("attempted to read past end of sequencer batch delayed messages")
}
return b.inbox.GetDelayedMessage(seqNum)
return b.inbox.GetDelayedMessage(b.ctx, seqNum)
}

var delayedMessagesMismatch = errors.New("sequencer batch delayed messages missing or different")
Expand Down
4 changes: 0 additions & 4 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,10 +990,6 @@ func (n *Node) StopAndWait() {
}
}

func (n *Node) FetchBatch(ctx context.Context, batchNum uint64) ([]byte, common.Hash, error) {
return n.InboxReader.GetSequencerMessageBytes(ctx, batchNum)
}

func (n *Node) FindInboxBatchContainingMessage(message arbutil.MessageIndex) (uint64, bool, error) {
return n.InboxTracker.FindInboxBatchContainingMessage(message)
}
Expand Down
12 changes: 12 additions & 0 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,18 @@ func (s *TransactionStreamer) GetMessage(seqNum arbutil.MessageIndex) (*arbostyp
return nil, err
}

err = message.Message.FillInBatchGasCost(func(batchNum uint64) ([]byte, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to make absolutley sure that in every way we can get a message to consensus client, it will always have the batchGasCost Filled.

I think there are other cases we want to cover, specifically:
legacyGetDelayedMessageAndAccumulator
GetDelayedMessageAccumulatorAndParentChainBlockNumber

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covered those other cases.

We don't have tests covering L1MessageType_BatchPostingReport today, right?

ctx, err := s.GetContextSafe()
if err != nil {
return nil, err
}
data, _, err := s.inboxReader.GetSequencerMessageBytes(ctx, batchNum)
return data, err
})
if err != nil {
return nil, err
}

return &message, nil
}

Expand Down
20 changes: 1 addition & 19 deletions arbos/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
Expand Down Expand Up @@ -144,26 +143,9 @@ func ProduceBlock(
statedb *state.StateDB,
chainContext core.ChainContext,
chainConfig *params.ChainConfig,
batchFetcher arbostypes.FallibleBatchFetcher,
isMsgForPrefetch bool,
) (*types.Block, types.Receipts, error) {
var batchFetchErr error
txes, err := ParseL2Transactions(message, chainConfig.ChainID, func(batchNum uint64, batchHash common.Hash) []byte {
data, err := batchFetcher(batchNum)
if err != nil {
batchFetchErr = err
return nil
}
dataHash := crypto.Keccak256Hash(data)
if dataHash != batchHash {
batchFetchErr = fmt.Errorf("expecting batch %v hash %v but got data with hash %v", batchNum, batchHash, dataHash)
return nil
}
return data
})
if batchFetchErr != nil {
return nil, nil, batchFetchErr
}
txes, err := ParseL2Transactions(message, chainConfig.ChainID)
if err != nil {
log.Warn("error parsing incoming message", "err", err)
txes = types.Transactions{}
Expand Down
2 changes: 1 addition & 1 deletion arbos/incomingmessage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestSerializeAndParseL1Message(t *testing.T) {
if err != nil {
t.Error(err)
}
txes, err := ParseL2Transactions(newMsg, chainId, nil)
txes, err := ParseL2Transactions(newMsg, chainId)
if err != nil {
t.Error(err)
}
Expand Down
13 changes: 5 additions & 8 deletions arbos/parse_l2.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ import (
"github.com/offchainlabs/nitro/util/arbmath"
)

type InfallibleBatchFetcher func(batchNum uint64, batchHash common.Hash) []byte

func ParseL2Transactions(msg *arbostypes.L1IncomingMessage, chainId *big.Int, batchFetcher InfallibleBatchFetcher) (types.Transactions, error) {
func ParseL2Transactions(msg *arbostypes.L1IncomingMessage, chainId *big.Int) (types.Transactions, error) {
if len(msg.L2msg) > arbostypes.MaxL2MessageSize {
// ignore the message if l2msg is too large
return nil, errors.New("message too large")
Expand Down Expand Up @@ -71,7 +69,7 @@ func ParseL2Transactions(msg *arbostypes.L1IncomingMessage, chainId *big.Int, ba
log.Debug("ignoring rollup event message")
return types.Transactions{}, nil
case arbostypes.L1MessageType_BatchPostingReport:
tx, err := parseBatchPostingReportMessage(bytes.NewReader(msg.L2msg), chainId, msg.BatchGasCost, batchFetcher)
tx, err := parseBatchPostingReportMessage(bytes.NewReader(msg.L2msg), chainId, msg.BatchGasCost)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -370,17 +368,16 @@ func parseSubmitRetryableMessage(rd io.Reader, header *arbostypes.L1IncomingMess
return types.NewTx(tx), err
}

func parseBatchPostingReportMessage(rd io.Reader, chainId *big.Int, msgBatchGasCost *uint64, batchFetcher InfallibleBatchFetcher) (*types.Transaction, error) {
batchTimestamp, batchPosterAddr, batchHash, batchNum, l1BaseFee, extraGas, err := arbostypes.ParseBatchPostingReportMessageFields(rd)
func parseBatchPostingReportMessage(rd io.Reader, chainId *big.Int, msgBatchGasCost *uint64) (*types.Transaction, error) {
batchTimestamp, batchPosterAddr, _, batchNum, l1BaseFee, extraGas, err := arbostypes.ParseBatchPostingReportMessageFields(rd)
if err != nil {
return nil, err
}
var batchDataGas uint64
if msgBatchGasCost != nil {
batchDataGas = *msgBatchGasCost
} else {
batchData := batchFetcher(batchNum, batchHash)
batchDataGas = arbostypes.ComputeBatchGasCost(batchData)
return nil, errors.New("cannot compute batch gas cost")
}
batchDataGas = arbmath.SaturatingUAdd(batchDataGas, extraGas)

Expand Down
17 changes: 12 additions & 5 deletions cmd/replay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ func main() {
panic(fmt.Sprintf("Error opening state db: %v", err.Error()))
}

batchFetcher := func(batchNum uint64) ([]byte, error) {
currentBatch := wavmio.GetInboxPosition()
if batchNum > currentBatch {
return nil, fmt.Errorf("invalid batch fetch request %d, max %d", batchNum, currentBatch)
}
return wavmio.ReadInboxMessage(batchNum), nil
}
readMessage := func(dasEnabled bool) *arbostypes.MessageWithMetadata {
var delayedMessagesRead uint64
if lastBlockHeader != nil {
Expand Down Expand Up @@ -232,6 +239,10 @@ func main() {
panic(fmt.Sprintf("Error reading from inbox multiplexer: %v", err.Error()))
}

err = message.Message.FillInBatchGasCost(batchFetcher)
if err != nil {
message.Message = arbostypes.InvalidL1Message
}
return message
}

Expand Down Expand Up @@ -280,14 +291,10 @@ func main() {
message := readMessage(chainConfig.ArbitrumChainParams.DataAvailabilityCommittee)

chainContext := WavmChainContext{}
batchFetcher := func(batchNum uint64) ([]byte, error) {
return wavmio.ReadInboxMessage(batchNum), nil
}
newBlock, _, err = arbos.ProduceBlock(message.Message, message.DelayedMessagesRead, lastBlockHeader, statedb, chainContext, chainConfig, batchFetcher, false)
newBlock, _, err = arbos.ProduceBlock(message.Message, message.DelayedMessagesRead, lastBlockHeader, statedb, chainContext, chainConfig, false)
if err != nil {
panic(err)
}

} else {
// Initialize ArbOS with this init message and create the genesis block.

Expand Down
19 changes: 0 additions & 19 deletions execution/gethexec/block_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/validator"
)

// BlockRecorder uses a separate statedatabase from the blockchain.
Expand Down Expand Up @@ -120,31 +119,14 @@ func (r *BlockRecorder) RecordBlockCreation(
}

var blockHash common.Hash
var readBatchInfo []validator.BatchInfo
if msg != nil {
batchFetcher := func(batchNum uint64) ([]byte, error) {
data, blockHash, err := r.execEngine.consensus.FetchBatch(ctx, batchNum)
if err != nil {
return nil, err
}
readBatchInfo = append(readBatchInfo, validator.BatchInfo{
Number: batchNum,
BlockHash: blockHash,
Data: data,
})
return data, nil
}
// Re-fetch the batch instead of using our cached cost,
// as the replay binary won't have the cache populated.
msg.Message.BatchGasCost = nil
block, _, err := arbos.ProduceBlock(
msg.Message,
msg.DelayedMessagesRead,
prevHeader,
recordingdb,
chaincontext,
chainConfig,
batchFetcher,
false,
)
if err != nil {
Expand Down Expand Up @@ -172,7 +154,6 @@ func (r *BlockRecorder) RecordBlockCreation(
Pos: pos,
BlockHash: blockHash,
Preimages: preimages,
BatchInfo: readBatchInfo,
UserWasms: recordingdb.UserWasms(),
}, err
}
Expand Down
9 changes: 1 addition & 8 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,7 @@ func (s *ExecutionEngine) resequenceReorgedMessages(messages []*arbostypes.Messa
log.Warn("skipping non-standard sequencer message found from reorg", "header", header)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want Lee's take on this,
But I think around line 275, when resequencing delayed messages, we should check if the delayed this is a BatchPostingReport message, and if so skip it and not resequence.

Copy link
Contributor Author

@diegoximenes diegoximenes Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in a meeting, this behavior will not be changed in this PR.
Maybe this will be revisited in a future PR.

continue
}
// We don't need a batch fetcher as this is an L2 message
txes, err := arbos.ParseL2Transactions(msg.Message, s.bc.Config().ChainID, nil)
txes, err := arbos.ParseL2Transactions(msg.Message, s.bc.Config().ChainID)
if err != nil {
log.Warn("failed to parse sequencer message found from reorg", "err", err)
continue
Expand Down Expand Up @@ -625,19 +624,13 @@ func (s *ExecutionEngine) createBlockFromNextMessage(msg *arbostypes.MessageWith
statedb.StartPrefetcher("TransactionStreamer")
defer statedb.StopPrefetcher()

batchFetcher := func(num uint64) ([]byte, error) {
data, _, err := s.consensus.FetchBatch(s.GetContext(), num)
return data, err
}

block, receipts, err := arbos.ProduceBlock(
msg.Message,
msg.DelayedMessagesRead,
currentHeader,
statedb,
s.bc,
s.bc.Config(),
batchFetcher,
isMsgForPrefetch,
)

Expand Down
Loading
Loading