From 47d6014c63576b107526f90fd9f016281179819a Mon Sep 17 00:00:00 2001 From: ImJeremyHe Date: Wed, 27 Nov 2024 12:19:28 +0800 Subject: [PATCH] Clean up --- arbnode/batch_poster.go | 9 ++- arbnode/transaction_streamer.go | 107 ++++++++++++++++---------------- 2 files changed, 59 insertions(+), 57 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index dbe1b7b930..db5b75bda7 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -557,6 +557,8 @@ func AccessList(opts *AccessListOpts) types.AccessList { return l } +var EspressoFetchMerkleRootErr = errors.New("failed to fetch the espresso merkle roof") + // Adds a block merkle proof to an Espresso justification, providing a proof that a set of transactions // hashes to some light client state root. func (b *BatchPoster) checkEspressoValidation( @@ -587,8 +589,6 @@ func (b *BatchPoster) checkEspressoValidation( return nil } - log.Warn("this message has not been finalized on L1 or validated") - if b.streamer.UseEscapeHatch { skip, err := b.streamer.getSkipVerificationPos() if err != nil { @@ -612,7 +612,7 @@ func (b *BatchPoster) checkEspressoValidation( return nil } - return fmt.Errorf("waiting for espresso finalization, pos: %d", b.building.msgCount) + return fmt.Errorf("%w (height: %d)", EspressoFetchMerkleRootErr, b.building.msgCount) } func (b *BatchPoster) submitEspressoTransactionPos(pos arbutil.MessageIndex) error { @@ -1815,12 +1815,14 @@ func (b *BatchPoster) Start(ctxIn context.Context) { storageRaceEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, storage.ErrStorageRace.Error(), time.Minute) normalGasEstimationFailedEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, ErrNormalGasEstimationFailed.Error(), time.Minute) accumulatorNotFoundEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, AccumulatorNotFoundErr.Error(), time.Minute) + espressoEphemeralErrorHandler := util.NewEphemeralErrorHandler(80*time.Minute, EspressoFetchMerkleRootErr.Error(), time.Hour) resetAllEphemeralErrs := func() { commonEphemeralErrorHandler.Reset() exceedMaxMempoolSizeEphemeralErrorHandler.Reset() storageRaceEphemeralErrorHandler.Reset() normalGasEstimationFailedEphemeralErrorHandler.Reset() accumulatorNotFoundEphemeralErrorHandler.Reset() + espressoEphemeralErrorHandler.Reset() } b.CallIteratively(func(ctx context.Context) time.Duration { var err error @@ -1875,6 +1877,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) { // Likely the inbox tracker just isn't caught up. // Let's see if this error disappears naturally. logLevel = commonEphemeralErrorHandler.LogLevel(err, logLevel) + logLevel = espressoEphemeralErrorHandler.LogLevel(err, logLevel) // If the error matches one of these, it's only logged at debug for the first minute, // then at warn for the next 4 minutes, then at error. If the error isn't one of these, // it'll be logged at warn for the first minute, then at error. diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 00f04eb11d..59110031fd 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -19,6 +19,7 @@ import ( lightclient "github.com/EspressoSystems/espresso-sequencer-go/light-client" tagged_base64 "github.com/EspressoSystems/espresso-sequencer-go/tagged-base64" "github.com/offchainlabs/nitro/espressocrypto" + "github.com/offchainlabs/nitro/util" espressoClient "github.com/EspressoSystems/espresso-sequencer-go/client" espressoTypes "github.com/EspressoSystems/espresso-sequencer-go/types" @@ -1257,107 +1258,99 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc // Check if the latest submitted transaction has been finalized on L1 and verify it. // Return a bool indicating whether a new transaction can be submitted to HotShot -func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Context) bool { +func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Context) error { submittedTxnPos, err := s.getEspressoSubmittedPos() if err != nil { - log.Warn("submitted pos not found", "err", err) - return false + return fmt.Errorf("submitted pos not found: %w", err) } if len(submittedTxnPos) == 0 { - return true // no submitted transaction + return nil // no submitted transaction, treated as successful } + submittedTxHash, err := s.getEspressoSubmittedHash() if err != nil { - log.Warn("submitted hash not found", "err", err) - return false + return fmt.Errorf("submitted hash not found: %w", err) } if submittedTxHash == nil { // this should not happen - log.Warn("missing the tx hash while the submitted txn position exists") - return false + return errors.New("missing the tx hash while the submitted txn position exists") } data, err := s.espressoClient.FetchTransactionByHash(ctx, submittedTxHash) if err != nil { - log.Warn("failed to fetch the submitted transaction hash", "err", err, "hash", submittedTxHash.String()) - return false + return fmt.Errorf("failed to fetch the submitted transaction hash (hash: %s): %w", submittedTxHash.String(), err) } height := data.BlockHeight header, err := s.espressoClient.FetchHeaderByHeight(ctx, height) if err != nil { - log.Warn("could not get the header", "height", height, "err", err) - return false + return fmt.Errorf("could not get the header (height: %d): %w", height, err) } // Verify the namespace proof resp, err := s.espressoClient.FetchTransactionsInBlock(ctx, height, s.chainConfig.ChainID.Uint64()) if err != nil { - log.Warn("failed to fetch the transactions in block, will retry", "err", err) - return false + return fmt.Errorf("failed to fetch the transactions in block (height: %d): %w", height, err) } msgs := []arbostypes.L1IncomingMessage{} for _, p := range submittedTxnPos { msg, err := s.GetMessage(p) if err != nil { - log.Error("failed to get the message in tx streamer", "pos", p) - return false + return fmt.Errorf("failed to get the message in tx streamer (pos: %d): %w", p, err) } if msg.Message != nil { msgs = append(msgs, *msg.Message) } } - // Rebuild the hotshot payload with messages to check if it is finalizied payload, length := arbos.BuildHotShotPayload(&msgs) if length != len(msgs) { - log.Error("failed to rebuild the hotshot payload, it is expected rebuild the transaction within all messages") - return false + return errors.New("failed to rebuild the hotshot payload; the number of messages does not match the expected length") } - namespaceOk := espressocrypto.VerifyNamespace(s.chainConfig.ChainID.Uint64(), resp.Proof, *header.Header.GetPayloadCommitment(), *header.Header.GetNsTable(), []espressoTypes.Bytes{payload}, resp.VidCommon) + namespaceOk := espressocrypto.VerifyNamespace( + s.chainConfig.ChainID.Uint64(), + resp.Proof, + *header.Header.GetPayloadCommitment(), + *header.Header.GetNsTable(), + []espressoTypes.Bytes{payload}, + resp.VidCommon, + ) if !namespaceOk { - log.Error("error validating namespace proof", "height", height) - return false + return fmt.Errorf("error validating namespace proof (height: %d)", height) } - // Verify the merkle tree proof snapshot, err := s.lightClientReader.FetchMerkleRoot(height, nil) if err != nil { - log.Warn("could not get the merkle root", "height", height, "err", err) - return false + return fmt.Errorf("%w (height: %d): %w", EspressoFetchMerkleRootErr, height, err) } if snapshot.Height <= height { - return false + return errors.New("snapshot height is less than or equal to transaction height") } nextHeader, err := s.espressoClient.FetchHeaderByHeight(ctx, snapshot.Height) if err != nil { - log.Warn("error fetching the snapshot header", "height", snapshot.Height, "err", err) - return false + return fmt.Errorf("error fetching the snapshot header (height: %d): %w", snapshot.Height, err) } proof, err := s.espressoClient.FetchBlockMerkleProof(ctx, snapshot.Height, height) if err != nil { - log.Warn("error fetching the block merkle proof", "height", height, "root height", snapshot.Height, "err", err) - return false + return fmt.Errorf("error fetching the block merkle proof (height: %d, root height: %d): %w", height, snapshot.Height, err) } blockMerkleTreeRoot := nextHeader.Header.GetBlockMerkleTreeRoot() jstHeader, err := json.Marshal(header) if err != nil { - log.Error("Failed to Marshal the header") - return false + return fmt.Errorf("failed to marshal the header: %w", err) } ok := espressocrypto.VerifyMerkleProof(proof.Proof, jstHeader, *blockMerkleTreeRoot, snapshot.Root) if !ok { - log.Error("error validating merkle proof", "height", height, "snapshot height", snapshot.Height) - return false + return fmt.Errorf("error validating merkle proof (height: %d, snapshot height: %d)", height, snapshot.Height) } // Validation completed. Update the database @@ -1365,30 +1358,22 @@ func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Co defer s.espressoTxnsStateInsertionMutex.Unlock() batch := s.db.NewBatch() - err = s.setEspressoSubmittedPos(batch, nil) - if err != nil { - log.Warn("failed to set the submitted pos to nil", "err", err) - return false + if err := s.setEspressoSubmittedPos(batch, nil); err != nil { + return fmt.Errorf("failed to set the submitted pos to nil: %w", err) } - err = s.setEspressoSubmittedHash(batch, nil) - if err != nil { - log.Warn("failed to set the submitted hash to nil", "err", err) - return false + if err := s.setEspressoSubmittedHash(batch, nil); err != nil { + return fmt.Errorf("failed to set the submitted hash to nil: %w", err) } lastConfirmedPos := submittedTxnPos[len(submittedTxnPos)-1] - err = s.setEspressoLastConfirmedPos(batch, &lastConfirmedPos) - if err != nil { - log.Warn("failed to set the last confirmed position", "err", err, "pos", lastConfirmedPos) - return false + if err := s.setEspressoLastConfirmedPos(batch, &lastConfirmedPos); err != nil { + return fmt.Errorf("failed to set the last confirmed position (pos: %d): %w", lastConfirmedPos, err) } - err = batch.Write() - if err != nil { - log.Error("failed to write to db", "err", err) - return false + if err := batch.Write(); err != nil { + return fmt.Errorf("failed to write to db: %w", err) } - log.Info("Finality message", "pos", submittedTxnPos, "tx", submittedTxHash.String()) - return true + + return nil } func (s *TransactionStreamer) getEspressoSubmittedPos() ([]arbutil.MessageIndex, error) { @@ -1785,6 +1770,8 @@ func (s *TransactionStreamer) toggleEscapeHatch(ctx context.Context) error { return nil } +var espressoEphemeralErrorHandler = util.NewEphemeralErrorHandler(80*time.Minute, EspressoFetchMerkleRootErr.Error(), time.Hour) + func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct{}) time.Duration { retryRate := s.espressoTxnsPollingInterval * 50 enabledEspresso, err := s.isEspressoMode() @@ -1797,9 +1784,21 @@ func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct log.Error("error checking escape hatch", "err", err) return retryRate } - canSubmit := s.pollSubmittedTransactionForFinality(ctx) + err = s.pollSubmittedTransactionForFinality(ctx) + if err != nil { + if ctx.Err() != nil { + return 0 + } + logLevel := log.Error + logLevel = espressoEphemeralErrorHandler.LogLevel(err, logLevel) + logLevel("error polling finality", "err", err) + return retryRate + } else { + espressoEphemeralErrorHandler.Reset() + } + shouldSubmit := s.shouldSubmitEspressoTransaction() - if canSubmit && shouldSubmit { + if shouldSubmit { return s.submitEspressoTransactions(ctx) }