Skip to content

Commit

Permalink
Clean up (#351)
Browse files Browse the repository at this point in the history
* Clean up

* Remove changes to parse_l2.go

* Clean up

* remove attestation log

---------

Co-authored-by: Sneh Koul <[email protected]>
  • Loading branch information
ImJeremyHe and Sneh1999 authored Dec 2, 2024
1 parent d683e16 commit 78e1499
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 89 deletions.
11 changes: 7 additions & 4 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,9 @@ func AccessList(opts *AccessListOpts) types.AccessList {
return l
}

var EspressoFetchMerkleRootErr = errors.New("failed to fetch the espresso merkle roof")
var EspressoFetchTransactionErr = errors.New("failed to fetch the espresso transaction")

// Adds a block merkle proof to an Espresso justification, providing a proof that a set of transactions
// hashes to some light client state root.
func (b *BatchPoster) checkEspressoValidation(
Expand Down Expand Up @@ -587,8 +590,6 @@ func (b *BatchPoster) checkEspressoValidation(
return nil
}

log.Warn("this message has not been finalized on L1 or validated")

if b.streamer.UseEscapeHatch {
skip, err := b.streamer.getSkipVerificationPos()
if err != nil {
Expand All @@ -612,7 +613,7 @@ func (b *BatchPoster) checkEspressoValidation(
return nil
}

return fmt.Errorf("waiting for espresso finalization, pos: %d", b.building.msgCount)
return fmt.Errorf("%w (height: %d)", EspressoFetchMerkleRootErr, b.building.msgCount)
}

func (b *BatchPoster) submitEspressoTransactionPos(pos arbutil.MessageIndex) error {
Expand Down Expand Up @@ -1203,7 +1204,6 @@ func (b *BatchPoster) getAttestationQuote(userData []byte) ([]byte, error) {
return []byte{}, fmt.Errorf("failed to read quote file: %w", err)
}

log.Info("Attestation quote generated", "quote", hex.EncodeToString(attestationQuote))
return attestationQuote, nil
}

Expand Down Expand Up @@ -1815,12 +1815,14 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
storageRaceEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, storage.ErrStorageRace.Error(), time.Minute)
normalGasEstimationFailedEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, ErrNormalGasEstimationFailed.Error(), time.Minute)
accumulatorNotFoundEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, AccumulatorNotFoundErr.Error(), time.Minute)
espressoEphemeralErrorHandler := util.NewEphemeralErrorHandler(80*time.Minute, EspressoFetchMerkleRootErr.Error(), time.Hour)
resetAllEphemeralErrs := func() {
commonEphemeralErrorHandler.Reset()
exceedMaxMempoolSizeEphemeralErrorHandler.Reset()
storageRaceEphemeralErrorHandler.Reset()
normalGasEstimationFailedEphemeralErrorHandler.Reset()
accumulatorNotFoundEphemeralErrorHandler.Reset()
espressoEphemeralErrorHandler.Reset()
}
b.CallIteratively(func(ctx context.Context) time.Duration {
var err error
Expand Down Expand Up @@ -1875,6 +1877,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
// Likely the inbox tracker just isn't caught up.
// Let's see if this error disappears naturally.
logLevel = commonEphemeralErrorHandler.LogLevel(err, logLevel)
logLevel = espressoEphemeralErrorHandler.LogLevel(err, logLevel)
// If the error matches one of these, it's only logged at debug for the first minute,
// then at warn for the next 4 minutes, then at error. If the error isn't one of these,
// it'll be logged at warn for the first minute, then at error.
Expand Down
147 changes: 88 additions & 59 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
lightclient "github.com/EspressoSystems/espresso-sequencer-go/light-client"
tagged_base64 "github.com/EspressoSystems/espresso-sequencer-go/tagged-base64"
"github.com/offchainlabs/nitro/espressocrypto"
"github.com/offchainlabs/nitro/util"

espressoClient "github.com/EspressoSystems/espresso-sequencer-go/client"
espressoTypes "github.com/EspressoSystems/espresso-sequencer-go/types"
Expand All @@ -32,7 +33,6 @@ import (
"github.com/ethereum/go-ethereum/rlp"
flag "github.com/spf13/pflag"

"github.com/offchainlabs/nitro/arbos"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcaster"
Expand Down Expand Up @@ -1257,138 +1257,122 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc

// Check if the latest submitted transaction has been finalized on L1 and verify it.
// Return a bool indicating whether a new transaction can be submitted to HotShot
func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Context) bool {
func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Context) error {
submittedTxnPos, err := s.getEspressoSubmittedPos()
if err != nil {
log.Warn("submitted pos not found", "err", err)
return false
return fmt.Errorf("submitted pos not found: %w", err)
}
if len(submittedTxnPos) == 0 {
return true // no submitted transaction
return nil // no submitted transaction, treated as successful
}

submittedTxHash, err := s.getEspressoSubmittedHash()
if err != nil {
log.Warn("submitted hash not found", "err", err)
return false
return fmt.Errorf("submitted hash not found: %w", err)
}

if submittedTxHash == nil {
// this should not happen
log.Warn("missing the tx hash while the submitted txn position exists")
return false
return errors.New("missing the tx hash while the submitted txn position exists")
}

data, err := s.espressoClient.FetchTransactionByHash(ctx, submittedTxHash)
if err != nil {
log.Warn("failed to fetch the submitted transaction hash", "err", err, "hash", submittedTxHash.String())
return false
return fmt.Errorf("failed to fetch the submitted transaction hash (hash: %s): %w", submittedTxHash.String(), err)
}

height := data.BlockHeight

header, err := s.espressoClient.FetchHeaderByHeight(ctx, height)
if err != nil {
log.Warn("could not get the header", "height", height, "err", err)
return false
return fmt.Errorf("could not get the header (height: %d): %w", height, err)
}

// Verify the namespace proof
resp, err := s.espressoClient.FetchTransactionsInBlock(ctx, height, s.chainConfig.ChainID.Uint64())
if err != nil {
log.Warn("failed to fetch the transactions in block, will retry", "err", err)
return false
return fmt.Errorf("failed to fetch the transactions in block (height: %d): %w", height, err)
}

msgs := []arbostypes.L1IncomingMessage{}
for _, p := range submittedTxnPos {
msg, err := s.GetMessage(p)
if err != nil {
log.Error("failed to get the message in tx streamer", "pos", p)
return false
return fmt.Errorf("failed to get the message in tx streamer (pos: %d): %w", p, err)
}
if msg.Message != nil {
msgs = append(msgs, *msg.Message)
}
}

// Rebuild the hotshot payload with messages to check if it is finalizied
payload, length := arbos.BuildHotShotPayload(&msgs)
payload, length := buildHotShotPayload(&msgs)
if length != len(msgs) {
log.Error("failed to rebuild the hotshot payload, it is expected rebuild the transaction within all messages")
return false
return errors.New("failed to rebuild the hotshot payload; the number of messages does not match the expected length")
}

namespaceOk := espressocrypto.VerifyNamespace(s.chainConfig.ChainID.Uint64(), resp.Proof, *header.Header.GetPayloadCommitment(), *header.Header.GetNsTable(), []espressoTypes.Bytes{payload}, resp.VidCommon)
namespaceOk := espressocrypto.VerifyNamespace(
s.chainConfig.ChainID.Uint64(),
resp.Proof,
*header.Header.GetPayloadCommitment(),
*header.Header.GetNsTable(),
[]espressoTypes.Bytes{payload},
resp.VidCommon,
)
if !namespaceOk {
log.Error("error validating namespace proof", "height", height)
return false
return fmt.Errorf("error validating namespace proof (height: %d)", height)
}

// Verify the merkle tree proof
snapshot, err := s.lightClientReader.FetchMerkleRoot(height, nil)
if err != nil {
log.Warn("could not get the merkle root", "height", height, "err", err)
return false
return fmt.Errorf("%w (height: %d): %w", EspressoFetchMerkleRootErr, height, err)
}

if snapshot.Height <= height {
return false
return errors.New("snapshot height is less than or equal to transaction height")
}

nextHeader, err := s.espressoClient.FetchHeaderByHeight(ctx, snapshot.Height)
if err != nil {
log.Warn("error fetching the snapshot header", "height", snapshot.Height, "err", err)
return false
return fmt.Errorf("error fetching the snapshot header (height: %d): %w", snapshot.Height, err)
}

proof, err := s.espressoClient.FetchBlockMerkleProof(ctx, snapshot.Height, height)
if err != nil {
log.Warn("error fetching the block merkle proof", "height", height, "root height", snapshot.Height, "err", err)
return false
return fmt.Errorf("error fetching the block merkle proof (height: %d, root height: %d): %w", height, snapshot.Height, err)
}

blockMerkleTreeRoot := nextHeader.Header.GetBlockMerkleTreeRoot()
jstHeader, err := json.Marshal(header)
if err != nil {
log.Error("Failed to Marshal the header")
return false
return fmt.Errorf("failed to marshal the header: %w", err)
}

ok := espressocrypto.VerifyMerkleProof(proof.Proof, jstHeader, *blockMerkleTreeRoot, snapshot.Root)
if !ok {
log.Error("error validating merkle proof", "height", height, "snapshot height", snapshot.Height)
return false
return fmt.Errorf("error validating merkle proof (height: %d, snapshot height: %d)", height, snapshot.Height)
}

// Validation completed. Update the database
s.espressoTxnsStateInsertionMutex.Lock()
defer s.espressoTxnsStateInsertionMutex.Unlock()

batch := s.db.NewBatch()
err = s.setEspressoSubmittedPos(batch, nil)
if err != nil {
log.Warn("failed to set the submitted pos to nil", "err", err)
return false
if err := s.setEspressoSubmittedPos(batch, nil); err != nil {
return fmt.Errorf("failed to set the submitted pos to nil: %w", err)
}
err = s.setEspressoSubmittedHash(batch, nil)
if err != nil {
log.Warn("failed to set the submitted hash to nil", "err", err)
return false
if err := s.setEspressoSubmittedHash(batch, nil); err != nil {
return fmt.Errorf("failed to set the submitted hash to nil: %w", err)
}
lastConfirmedPos := submittedTxnPos[len(submittedTxnPos)-1]
err = s.setEspressoLastConfirmedPos(batch, &lastConfirmedPos)
if err != nil {
log.Warn("failed to set the last confirmed position", "err", err, "pos", lastConfirmedPos)
return false
if err := s.setEspressoLastConfirmedPos(batch, &lastConfirmedPos); err != nil {
return fmt.Errorf("failed to set the last confirmed position (pos: %d): %w", lastConfirmedPos, err)
}

err = batch.Write()
if err != nil {
log.Error("failed to write to db", "err", err)
return false
if err := batch.Write(); err != nil {
return fmt.Errorf("failed to write to db: %w", err)
}
log.Info("Finality message", "pos", submittedTxnPos, "tx", submittedTxHash.String())
return true

return nil
}

func (s *TransactionStreamer) getEspressoSubmittedPos() ([]arbutil.MessageIndex, error) {
Expand Down Expand Up @@ -1639,13 +1623,13 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) ti
msgs = append(msgs, *msg.Message)
}
}
payload, msgCnt := arbos.BuildHotShotPayload(&msgs)
payload, msgCnt := buildHotShotPayload(&msgs)
if msgCnt == 0 {
log.Error("failed to build the hotshot transaction: a large message has exceeded the size limit")
return s.espressoTxnsPollingInterval
}

log.Info("submitting transaction to espresso using sovereign sequencer")
log.Info("submitting transaction to hotshot for finalization")

// Note: same key should not be used for two namespaces for this to work
hash, err := s.espressoClient.SubmitTransaction(ctx, espressoTypes.Transaction{
Expand Down Expand Up @@ -1785,6 +1769,16 @@ func (s *TransactionStreamer) toggleEscapeHatch(ctx context.Context) error {
return nil
}

var espressoMerkleProofEphemeralErrorHandler = util.NewEphemeralErrorHandler(80*time.Minute, EspressoFetchMerkleRootErr.Error(), time.Hour)
var espressoTransactionEphemeralErrorHandler = util.NewEphemeralErrorHandler(3*time.Minute, EspressoFetchTransactionErr.Error(), time.Minute)

func getLogLevel(err error) func(string, ...interface{}) {
logLevel := log.Error
logLevel = espressoMerkleProofEphemeralErrorHandler.LogLevel(err, logLevel)
logLevel = espressoTransactionEphemeralErrorHandler.LogLevel(err, logLevel)
return logLevel
}

func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct{}) time.Duration {
retryRate := s.espressoTxnsPollingInterval * 50
enabledEspresso, err := s.isEspressoMode()
Expand All @@ -1794,12 +1788,27 @@ func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct
if enabledEspresso {
err := s.toggleEscapeHatch(ctx)
if err != nil {
log.Error("error checking escape hatch", "err", err)
if ctx.Err() != nil {
return 0
}
logLevel := getLogLevel(err)
logLevel("error checking escape hatch, will retry", "err", err)
return retryRate
}
err = s.pollSubmittedTransactionForFinality(ctx)
if err != nil {
if ctx.Err() != nil {
return 0
}
logLevel := getLogLevel(err)
logLevel("error polling finality", "err", err)
return retryRate
} else {
espressoMerkleProofEphemeralErrorHandler.Reset()
}
canSubmit := s.pollSubmittedTransactionForFinality(ctx)

shouldSubmit := s.shouldSubmitEspressoTransaction()
if canSubmit && shouldSubmit {
if shouldSubmit {
return s.submitEspressoTransactions(ctx)
}

Expand All @@ -1823,3 +1832,23 @@ func (s *TransactionStreamer) Start(ctxIn context.Context) error {

return stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.executeMessages, s.newMessageNotifier)
}

const ESPRESSO_TRANSACTION_SIZE_LIMIT int = 10 * 1024

func buildHotShotPayload(msgs *[]arbostypes.L1IncomingMessage) (espressoTypes.Bytes, int) {
payload := []byte{}
msgCnt := 0

sizeBuf := make([]byte, 8)
for _, msg := range *msgs {
if len(payload) >= ESPRESSO_TRANSACTION_SIZE_LIMIT {
break
}
msgByte := msg.L2msg
binary.BigEndian.PutUint64(sizeBuf, uint64(len(msgByte)))
payload = append(payload, sizeBuf...)
payload = append(payload, msgByte...)
msgCnt += 1
}
return payload, msgCnt
}
22 changes: 0 additions & 22 deletions arbos/parse_l2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package arbos

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"math/big"
"time"

espressoTypes "github.com/EspressoSystems/espresso-sequencer-go/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
Expand All @@ -19,8 +17,6 @@ import (
"github.com/offchainlabs/nitro/util/arbmath"
)

const ESPRESSO_TRANSACTION_SIZE_LIMIT int = 10 * 1024

func ParseL2Transactions(msg *arbostypes.L1IncomingMessage, chainId *big.Int) (types.Transactions, error) {
if len(msg.L2msg) > arbostypes.MaxL2MessageSize {
// ignore the message if l2msg is too large
Expand Down Expand Up @@ -397,21 +393,3 @@ func parseBatchPostingReportMessage(rd io.Reader, chainId *big.Int, msgBatchGasC
// don't need to fill in the other fields, since they exist only to ensure uniqueness, and batchNum is already unique
}), nil
}

func BuildHotShotPayload(msgs *[]arbostypes.L1IncomingMessage) (espressoTypes.Bytes, int) {
payload := []byte{}
msgCnt := 0

sizeBuf := make([]byte, 8)
for _, msg := range *msgs {
if len(payload) >= ESPRESSO_TRANSACTION_SIZE_LIMIT {
break
}
msgByte := msg.L2msg
binary.BigEndian.PutUint64(sizeBuf, uint64(len(msgByte)))
payload = append(payload, sizeBuf...)
payload = append(payload, msgByte...)
msgCnt += 1
}
return payload, msgCnt
}
Loading

0 comments on commit 78e1499

Please sign in to comment.