Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
ImJeremyHe committed Nov 27, 2024
1 parent d683e16 commit 47d6014
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 57 deletions.
9 changes: 6 additions & 3 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ func AccessList(opts *AccessListOpts) types.AccessList {
return l
}

var EspressoFetchMerkleRootErr = errors.New("failed to fetch the espresso merkle roof")

// Adds a block merkle proof to an Espresso justification, providing a proof that a set of transactions
// hashes to some light client state root.
func (b *BatchPoster) checkEspressoValidation(
Expand Down Expand Up @@ -587,8 +589,6 @@ func (b *BatchPoster) checkEspressoValidation(
return nil
}

log.Warn("this message has not been finalized on L1 or validated")

if b.streamer.UseEscapeHatch {
skip, err := b.streamer.getSkipVerificationPos()
if err != nil {
Expand All @@ -612,7 +612,7 @@ func (b *BatchPoster) checkEspressoValidation(
return nil
}

return fmt.Errorf("waiting for espresso finalization, pos: %d", b.building.msgCount)
return fmt.Errorf("%w (height: %d)", EspressoFetchMerkleRootErr, b.building.msgCount)
}

func (b *BatchPoster) submitEspressoTransactionPos(pos arbutil.MessageIndex) error {
Expand Down Expand Up @@ -1815,12 +1815,14 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
storageRaceEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, storage.ErrStorageRace.Error(), time.Minute)
normalGasEstimationFailedEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, ErrNormalGasEstimationFailed.Error(), time.Minute)
accumulatorNotFoundEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, AccumulatorNotFoundErr.Error(), time.Minute)
espressoEphemeralErrorHandler := util.NewEphemeralErrorHandler(80*time.Minute, EspressoFetchMerkleRootErr.Error(), time.Hour)
resetAllEphemeralErrs := func() {
commonEphemeralErrorHandler.Reset()
exceedMaxMempoolSizeEphemeralErrorHandler.Reset()
storageRaceEphemeralErrorHandler.Reset()
normalGasEstimationFailedEphemeralErrorHandler.Reset()
accumulatorNotFoundEphemeralErrorHandler.Reset()
espressoEphemeralErrorHandler.Reset()
}
b.CallIteratively(func(ctx context.Context) time.Duration {
var err error
Expand Down Expand Up @@ -1875,6 +1877,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
// Likely the inbox tracker just isn't caught up.
// Let's see if this error disappears naturally.
logLevel = commonEphemeralErrorHandler.LogLevel(err, logLevel)
logLevel = espressoEphemeralErrorHandler.LogLevel(err, logLevel)
// If the error matches one of these, it's only logged at debug for the first minute,
// then at warn for the next 4 minutes, then at error. If the error isn't one of these,
// it'll be logged at warn for the first minute, then at error.
Expand Down
107 changes: 53 additions & 54 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
lightclient "github.com/EspressoSystems/espresso-sequencer-go/light-client"
tagged_base64 "github.com/EspressoSystems/espresso-sequencer-go/tagged-base64"
"github.com/offchainlabs/nitro/espressocrypto"
"github.com/offchainlabs/nitro/util"

espressoClient "github.com/EspressoSystems/espresso-sequencer-go/client"
espressoTypes "github.com/EspressoSystems/espresso-sequencer-go/types"
Expand Down Expand Up @@ -1257,138 +1258,122 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc

// Check if the latest submitted transaction has been finalized on L1 and verify it.
// Return a bool indicating whether a new transaction can be submitted to HotShot
func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Context) bool {
func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Context) error {
submittedTxnPos, err := s.getEspressoSubmittedPos()
if err != nil {
log.Warn("submitted pos not found", "err", err)
return false
return fmt.Errorf("submitted pos not found: %w", err)
}
if len(submittedTxnPos) == 0 {
return true // no submitted transaction
return nil // no submitted transaction, treated as successful
}

submittedTxHash, err := s.getEspressoSubmittedHash()
if err != nil {
log.Warn("submitted hash not found", "err", err)
return false
return fmt.Errorf("submitted hash not found: %w", err)
}

if submittedTxHash == nil {
// this should not happen
log.Warn("missing the tx hash while the submitted txn position exists")
return false
return errors.New("missing the tx hash while the submitted txn position exists")
}

data, err := s.espressoClient.FetchTransactionByHash(ctx, submittedTxHash)
if err != nil {
log.Warn("failed to fetch the submitted transaction hash", "err", err, "hash", submittedTxHash.String())
return false
return fmt.Errorf("failed to fetch the submitted transaction hash (hash: %s): %w", submittedTxHash.String(), err)
}

height := data.BlockHeight

header, err := s.espressoClient.FetchHeaderByHeight(ctx, height)
if err != nil {
log.Warn("could not get the header", "height", height, "err", err)
return false
return fmt.Errorf("could not get the header (height: %d): %w", height, err)
}

// Verify the namespace proof
resp, err := s.espressoClient.FetchTransactionsInBlock(ctx, height, s.chainConfig.ChainID.Uint64())
if err != nil {
log.Warn("failed to fetch the transactions in block, will retry", "err", err)
return false
return fmt.Errorf("failed to fetch the transactions in block (height: %d): %w", height, err)
}

msgs := []arbostypes.L1IncomingMessage{}
for _, p := range submittedTxnPos {
msg, err := s.GetMessage(p)
if err != nil {
log.Error("failed to get the message in tx streamer", "pos", p)
return false
return fmt.Errorf("failed to get the message in tx streamer (pos: %d): %w", p, err)
}
if msg.Message != nil {
msgs = append(msgs, *msg.Message)
}
}

// Rebuild the hotshot payload with messages to check if it is finalizied
payload, length := arbos.BuildHotShotPayload(&msgs)
if length != len(msgs) {
log.Error("failed to rebuild the hotshot payload, it is expected rebuild the transaction within all messages")
return false
return errors.New("failed to rebuild the hotshot payload; the number of messages does not match the expected length")
}

namespaceOk := espressocrypto.VerifyNamespace(s.chainConfig.ChainID.Uint64(), resp.Proof, *header.Header.GetPayloadCommitment(), *header.Header.GetNsTable(), []espressoTypes.Bytes{payload}, resp.VidCommon)
namespaceOk := espressocrypto.VerifyNamespace(
s.chainConfig.ChainID.Uint64(),
resp.Proof,
*header.Header.GetPayloadCommitment(),
*header.Header.GetNsTable(),
[]espressoTypes.Bytes{payload},
resp.VidCommon,
)
if !namespaceOk {
log.Error("error validating namespace proof", "height", height)
return false
return fmt.Errorf("error validating namespace proof (height: %d)", height)
}

// Verify the merkle tree proof
snapshot, err := s.lightClientReader.FetchMerkleRoot(height, nil)
if err != nil {
log.Warn("could not get the merkle root", "height", height, "err", err)
return false
return fmt.Errorf("%w (height: %d): %w", EspressoFetchMerkleRootErr, height, err)
}

if snapshot.Height <= height {
return false
return errors.New("snapshot height is less than or equal to transaction height")
}

nextHeader, err := s.espressoClient.FetchHeaderByHeight(ctx, snapshot.Height)
if err != nil {
log.Warn("error fetching the snapshot header", "height", snapshot.Height, "err", err)
return false
return fmt.Errorf("error fetching the snapshot header (height: %d): %w", snapshot.Height, err)
}

proof, err := s.espressoClient.FetchBlockMerkleProof(ctx, snapshot.Height, height)
if err != nil {
log.Warn("error fetching the block merkle proof", "height", height, "root height", snapshot.Height, "err", err)
return false
return fmt.Errorf("error fetching the block merkle proof (height: %d, root height: %d): %w", height, snapshot.Height, err)
}

blockMerkleTreeRoot := nextHeader.Header.GetBlockMerkleTreeRoot()
jstHeader, err := json.Marshal(header)
if err != nil {
log.Error("Failed to Marshal the header")
return false
return fmt.Errorf("failed to marshal the header: %w", err)
}

ok := espressocrypto.VerifyMerkleProof(proof.Proof, jstHeader, *blockMerkleTreeRoot, snapshot.Root)
if !ok {
log.Error("error validating merkle proof", "height", height, "snapshot height", snapshot.Height)
return false
return fmt.Errorf("error validating merkle proof (height: %d, snapshot height: %d)", height, snapshot.Height)
}

// Validation completed. Update the database
s.espressoTxnsStateInsertionMutex.Lock()
defer s.espressoTxnsStateInsertionMutex.Unlock()

batch := s.db.NewBatch()
err = s.setEspressoSubmittedPos(batch, nil)
if err != nil {
log.Warn("failed to set the submitted pos to nil", "err", err)
return false
if err := s.setEspressoSubmittedPos(batch, nil); err != nil {
return fmt.Errorf("failed to set the submitted pos to nil: %w", err)
}
err = s.setEspressoSubmittedHash(batch, nil)
if err != nil {
log.Warn("failed to set the submitted hash to nil", "err", err)
return false
if err := s.setEspressoSubmittedHash(batch, nil); err != nil {
return fmt.Errorf("failed to set the submitted hash to nil: %w", err)
}
lastConfirmedPos := submittedTxnPos[len(submittedTxnPos)-1]
err = s.setEspressoLastConfirmedPos(batch, &lastConfirmedPos)
if err != nil {
log.Warn("failed to set the last confirmed position", "err", err, "pos", lastConfirmedPos)
return false
if err := s.setEspressoLastConfirmedPos(batch, &lastConfirmedPos); err != nil {
return fmt.Errorf("failed to set the last confirmed position (pos: %d): %w", lastConfirmedPos, err)
}

err = batch.Write()
if err != nil {
log.Error("failed to write to db", "err", err)
return false
if err := batch.Write(); err != nil {
return fmt.Errorf("failed to write to db: %w", err)
}
log.Info("Finality message", "pos", submittedTxnPos, "tx", submittedTxHash.String())
return true

return nil
}

func (s *TransactionStreamer) getEspressoSubmittedPos() ([]arbutil.MessageIndex, error) {
Expand Down Expand Up @@ -1785,6 +1770,8 @@ func (s *TransactionStreamer) toggleEscapeHatch(ctx context.Context) error {
return nil
}

var espressoEphemeralErrorHandler = util.NewEphemeralErrorHandler(80*time.Minute, EspressoFetchMerkleRootErr.Error(), time.Hour)

func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct{}) time.Duration {
retryRate := s.espressoTxnsPollingInterval * 50
enabledEspresso, err := s.isEspressoMode()
Expand All @@ -1797,9 +1784,21 @@ func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct
log.Error("error checking escape hatch", "err", err)
return retryRate
}
canSubmit := s.pollSubmittedTransactionForFinality(ctx)
err = s.pollSubmittedTransactionForFinality(ctx)
if err != nil {
if ctx.Err() != nil {
return 0
}
logLevel := log.Error
logLevel = espressoEphemeralErrorHandler.LogLevel(err, logLevel)
logLevel("error polling finality", "err", err)
return retryRate
} else {
espressoEphemeralErrorHandler.Reset()
}

shouldSubmit := s.shouldSubmitEspressoTransaction()
if canSubmit && shouldSubmit {
if shouldSubmit {
return s.submitEspressoTransactions(ctx)
}

Expand Down

0 comments on commit 47d6014

Please sign in to comment.