Skip to content

Commit

Permalink
Include the invalid txs in message and fix parsing batches
Browse files Browse the repository at this point in the history
  • Loading branch information
ImJeremyHe committed Nov 21, 2023
1 parent c3c566d commit 7c4fb85
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 16 deletions.
25 changes: 22 additions & 3 deletions arbos/parse_l2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package arbos

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -96,6 +97,7 @@ const (
L2MessageKind_Heartbeat = 6 // deprecated
L2MessageKind_SignedCompressedTx = 7
// 8 is reserved for BLS signed batch
L2MessageKind_EspressoTx = 10
)

// Warning: this does not validate the day of the week or if DST is being observed
Expand Down Expand Up @@ -151,10 +153,12 @@ func parseL2Message(rd io.Reader, poster common.Address, timestamp uint64, reque
}
nestedSegments, err := parseL2Message(bytes.NewReader(nextMsg), poster, timestamp, nextRequestId, chainId, depth+1)
if err != nil {
return nil, err
log.Warn("Failed to parse L2Message in a patch")
}
if nestedSegments != nil && nestedSegments.Len() > 0 {
segments = append(segments, nestedSegments...)
index.Add(index, big.NewInt(1))
}
segments = append(segments, nestedSegments...)
index.Add(index, big.NewInt(1))
}
case L2MessageKind_SignedTx:
newTx := new(types.Transaction)
Expand All @@ -171,6 +175,21 @@ func parseL2Message(rd io.Reader, poster common.Address, timestamp uint64, reque
return nil, types.ErrTxTypeNotSupported
}
return types.Transactions{newTx}, nil
case L2MessageKind_EspressoTx:
newTx := new(types.Transaction)
// Safe to read in its entirety, as all input readers are limited
readBytes, err := io.ReadAll(rd)
if err != nil {
return nil, err
}
if err := json.Unmarshal(readBytes, &newTx); err != nil {
return nil, err
}
if newTx.Type() >= types.ArbitrumDepositTxType {
// Should be unreachable due to not accepting Arbitrum internal txs
return nil, types.ErrTxTypeNotSupported
}
return types.Transactions{newTx}, nil
case L2MessageKind_Heartbeat:
if timestamp >= HeartbeatsDisabledAt {
return nil, errors.New("heartbeat messages have been disabled")
Expand Down
48 changes: 35 additions & 13 deletions execution/gethexec/espresso_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package gethexec

import (
"context"
"encoding/json"
"encoding/binary"
"time"

"github.com/offchainlabs/nitro/espresso"
Expand Down Expand Up @@ -78,7 +78,6 @@ func (s *EspressoSequencer) makeSequencingHooks() *arbos.SequencingHooks {

func (s *EspressoSequencer) createBlock(ctx context.Context) (returnValue bool) {
nextSeqBlockNum := s.hotShotState.nextSeqBlockNum
log.Info("Attempting to sequence Espresso block", "block_num", nextSeqBlockNum)
header, err := s.hotShotState.client.FetchHeader(ctx, nextSeqBlockNum)
if err != nil {
log.Warn("Unable to fetch header for block number, will retry", "block_num", nextSeqBlockNum)
Expand All @@ -89,16 +88,6 @@ func (s *EspressoSequencer) createBlock(ctx context.Context) (returnValue bool)
log.Error("Error fetching transactions", "err", err)
return false

}
var txes types.Transactions
for _, tx := range arbTxns.Transactions {
var out types.Transaction
if err := json.Unmarshal(tx, &out); err != nil {
log.Error("Failed to serialize")
return false
}
txes = append(txes, &out)

}

arbHeader := &arbostypes.L1IncomingMessageHeader{
Expand All @@ -111,8 +100,10 @@ func (s *EspressoSequencer) createBlock(ctx context.Context) (returnValue bool)
// TODO: add justification https://github.com/EspressoSystems/espresso-sequencer/issues/733
}

msg := messageFromEspresso(arbHeader, arbTxns)

hooks := s.makeSequencingHooks()
_, err = s.execEngine.SequenceTransactions(arbHeader, txes, hooks)
_, err = s.execEngine.SequenceMessage(msg, hooks)
if err != nil {
log.Error("Sequencing error for block number", "block_num", nextSeqBlockNum, "err", err)
return false
Expand Down Expand Up @@ -165,3 +156,34 @@ func (s *EspressoSequencer) preTxFilter(_ *params.ChainConfig, _ *types.Header,
func (s *EspressoSequencer) postTxFilter(_ *types.Header, _ *arbosState.ArbosState, _ *types.Transaction, _ common.Address, _ uint64, _ *core.ExecutionResult) error {
return nil
}

// messageFromEspresso serializes the raw data of the transactions from the espresso block into an arbitrum message.
// In other words, all the malformed and invalid transactions are included in the message, which means
// - validators can rebuild a block from this message
// - validators can check the espresso commitment
//
// Refer to `execution/gethexec/executionengine.go messageFromTxes`
func messageFromEspresso(header *arbostypes.L1IncomingMessageHeader, txesInBlock espresso.TransactionsInBlock) arbostypes.L1IncomingMessage {
var l2Message []byte

txes := txesInBlock.Transactions
if len(txes) == 1 {
l2Message = append(l2Message, arbos.L2MessageKind_EspressoTx)
l2Message = append(l2Message, txes[0]...)
} else {
l2Message = append(l2Message, arbos.L2MessageKind_Batch)
sizeBuf := make([]byte, 8)
for _, tx := range txes {
binary.BigEndian.PutUint64(sizeBuf, uint64(len(tx)+1))
l2Message = append(l2Message, sizeBuf...)
l2Message = append(l2Message, arbos.L2MessageKind_EspressoTx)
l2Message = append(l2Message, tx...)
}

}

return arbostypes.L1IncomingMessage{
Header: header,
L2msg: l2Message,
}
}
53 changes: 53 additions & 0 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,59 @@ func (s *ExecutionEngine) SequenceTransactions(header *arbostypes.L1IncomingMess
})
}

func (s *ExecutionEngine) SequenceMessage(msg arbostypes.L1IncomingMessage, hooks *arbos.SequencingHooks) (*types.Block, error) {
return s.sequencerWrapper(func() (*types.Block, error) {
lastBlockHeader, err := s.getCurrentHeader()
if err != nil {
return nil, err
}

statedb, err := s.bc.StateAt(lastBlockHeader.Root)
if err != nil {
return nil, err
}

delayedMessagesRead := lastBlockHeader.Nonce.Uint64()

startTime := time.Now()
block, receipts, err := arbos.ProduceBlock(

Check failure on line 293 in execution/gethexec/executionengine.go

View workflow job for this annotation

GitHub Actions / Go Tests (defaults)

ineffectual assignment to err (ineffassign)

Check failure on line 293 in execution/gethexec/executionengine.go

View workflow job for this annotation

GitHub Actions / Go Tests (challenge)

ineffectual assignment to err (ineffassign)

Check failure on line 293 in execution/gethexec/executionengine.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

ineffectual assignment to err (ineffassign)
&msg,
delayedMessagesRead,
lastBlockHeader,
statedb,
s.bc,
s.bc.Config(),
func(batchNum uint64) ([]byte, error) { return nil, errors.New("invalid tx") },
)
blockCalcTime := time.Since(startTime)

pos, err := s.BlockNumberToMessageIndex(lastBlockHeader.Number.Uint64() + 1)
if err != nil {
return nil, err
}

msgWithMeta := arbostypes.MessageWithMetadata{
Message: &msg,
DelayedMessagesRead: delayedMessagesRead,
}

err = s.streamer.WriteMessageFromSequencer(pos, msgWithMeta)
if err != nil {
return nil, err
}

// Only write the block after we've written the messages, so if the node dies in the middle of this,
// it will naturally recover on startup by regenerating the missing block.
err = s.appendBlock(block, statedb, receipts, blockCalcTime)
if err != nil {
return nil, err
}

return block, nil

})
}

func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
lastBlockHeader, err := s.getCurrentHeader()
if err != nil {
Expand Down

0 comments on commit 7c4fb85

Please sign in to comment.