From 78e14993cd3b70a42bd9f495e35a84026630a544 Mon Sep 17 00:00:00 2001 From: Jeremy Date: Mon, 2 Dec 2024 23:11:22 +0800 Subject: [PATCH] Clean up (#351) * Clean up * Remove changes to parse_l2.go * Clean up * remove attestation log --------- Co-authored-by: Sneh Koul --- arbnode/batch_poster.go | 11 ++- arbnode/transaction_streamer.go | 147 +++++++++++++++++++------------- arbos/parse_l2.go | 22 ----- execution/interface.go | 8 +- 4 files changed, 99 insertions(+), 89 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index dbe1b7b930..b1fab08dd0 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -557,6 +557,9 @@ func AccessList(opts *AccessListOpts) types.AccessList { return l } +var EspressoFetchMerkleRootErr = errors.New("failed to fetch the espresso merkle roof") +var EspressoFetchTransactionErr = errors.New("failed to fetch the espresso transaction") + // Adds a block merkle proof to an Espresso justification, providing a proof that a set of transactions // hashes to some light client state root. func (b *BatchPoster) checkEspressoValidation( @@ -587,8 +590,6 @@ func (b *BatchPoster) checkEspressoValidation( return nil } - log.Warn("this message has not been finalized on L1 or validated") - if b.streamer.UseEscapeHatch { skip, err := b.streamer.getSkipVerificationPos() if err != nil { @@ -612,7 +613,7 @@ func (b *BatchPoster) checkEspressoValidation( return nil } - return fmt.Errorf("waiting for espresso finalization, pos: %d", b.building.msgCount) + return fmt.Errorf("%w (height: %d)", EspressoFetchMerkleRootErr, b.building.msgCount) } func (b *BatchPoster) submitEspressoTransactionPos(pos arbutil.MessageIndex) error { @@ -1203,7 +1204,6 @@ func (b *BatchPoster) getAttestationQuote(userData []byte) ([]byte, error) { return []byte{}, fmt.Errorf("failed to read quote file: %w", err) } - log.Info("Attestation quote generated", "quote", hex.EncodeToString(attestationQuote)) return attestationQuote, nil } @@ -1815,12 +1815,14 @@ func (b *BatchPoster) Start(ctxIn context.Context) { storageRaceEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, storage.ErrStorageRace.Error(), time.Minute) normalGasEstimationFailedEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, ErrNormalGasEstimationFailed.Error(), time.Minute) accumulatorNotFoundEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, AccumulatorNotFoundErr.Error(), time.Minute) + espressoEphemeralErrorHandler := util.NewEphemeralErrorHandler(80*time.Minute, EspressoFetchMerkleRootErr.Error(), time.Hour) resetAllEphemeralErrs := func() { commonEphemeralErrorHandler.Reset() exceedMaxMempoolSizeEphemeralErrorHandler.Reset() storageRaceEphemeralErrorHandler.Reset() normalGasEstimationFailedEphemeralErrorHandler.Reset() accumulatorNotFoundEphemeralErrorHandler.Reset() + espressoEphemeralErrorHandler.Reset() } b.CallIteratively(func(ctx context.Context) time.Duration { var err error @@ -1875,6 +1877,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) { // Likely the inbox tracker just isn't caught up. // Let's see if this error disappears naturally. logLevel = commonEphemeralErrorHandler.LogLevel(err, logLevel) + logLevel = espressoEphemeralErrorHandler.LogLevel(err, logLevel) // If the error matches one of these, it's only logged at debug for the first minute, // then at warn for the next 4 minutes, then at error. If the error isn't one of these, // it'll be logged at warn for the first minute, then at error. diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 00f04eb11d..59ebeff2b9 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -19,6 +19,7 @@ import ( lightclient "github.com/EspressoSystems/espresso-sequencer-go/light-client" tagged_base64 "github.com/EspressoSystems/espresso-sequencer-go/tagged-base64" "github.com/offchainlabs/nitro/espressocrypto" + "github.com/offchainlabs/nitro/util" espressoClient "github.com/EspressoSystems/espresso-sequencer-go/client" espressoTypes "github.com/EspressoSystems/espresso-sequencer-go/types" @@ -32,7 +33,6 @@ import ( "github.com/ethereum/go-ethereum/rlp" flag "github.com/spf13/pflag" - "github.com/offchainlabs/nitro/arbos" "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/broadcaster" @@ -1257,107 +1257,99 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc // Check if the latest submitted transaction has been finalized on L1 and verify it. // Return a bool indicating whether a new transaction can be submitted to HotShot -func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Context) bool { +func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Context) error { submittedTxnPos, err := s.getEspressoSubmittedPos() if err != nil { - log.Warn("submitted pos not found", "err", err) - return false + return fmt.Errorf("submitted pos not found: %w", err) } if len(submittedTxnPos) == 0 { - return true // no submitted transaction + return nil // no submitted transaction, treated as successful } + submittedTxHash, err := s.getEspressoSubmittedHash() if err != nil { - log.Warn("submitted hash not found", "err", err) - return false + return fmt.Errorf("submitted hash not found: %w", err) } if submittedTxHash == nil { // this should not happen - log.Warn("missing the tx hash while the submitted txn position exists") - return false + return errors.New("missing the tx hash while the submitted txn position exists") } data, err := s.espressoClient.FetchTransactionByHash(ctx, submittedTxHash) if err != nil { - log.Warn("failed to fetch the submitted transaction hash", "err", err, "hash", submittedTxHash.String()) - return false + return fmt.Errorf("failed to fetch the submitted transaction hash (hash: %s): %w", submittedTxHash.String(), err) } height := data.BlockHeight header, err := s.espressoClient.FetchHeaderByHeight(ctx, height) if err != nil { - log.Warn("could not get the header", "height", height, "err", err) - return false + return fmt.Errorf("could not get the header (height: %d): %w", height, err) } // Verify the namespace proof resp, err := s.espressoClient.FetchTransactionsInBlock(ctx, height, s.chainConfig.ChainID.Uint64()) if err != nil { - log.Warn("failed to fetch the transactions in block, will retry", "err", err) - return false + return fmt.Errorf("failed to fetch the transactions in block (height: %d): %w", height, err) } msgs := []arbostypes.L1IncomingMessage{} for _, p := range submittedTxnPos { msg, err := s.GetMessage(p) if err != nil { - log.Error("failed to get the message in tx streamer", "pos", p) - return false + return fmt.Errorf("failed to get the message in tx streamer (pos: %d): %w", p, err) } if msg.Message != nil { msgs = append(msgs, *msg.Message) } } - // Rebuild the hotshot payload with messages to check if it is finalizied - payload, length := arbos.BuildHotShotPayload(&msgs) + payload, length := buildHotShotPayload(&msgs) if length != len(msgs) { - log.Error("failed to rebuild the hotshot payload, it is expected rebuild the transaction within all messages") - return false + return errors.New("failed to rebuild the hotshot payload; the number of messages does not match the expected length") } - namespaceOk := espressocrypto.VerifyNamespace(s.chainConfig.ChainID.Uint64(), resp.Proof, *header.Header.GetPayloadCommitment(), *header.Header.GetNsTable(), []espressoTypes.Bytes{payload}, resp.VidCommon) + namespaceOk := espressocrypto.VerifyNamespace( + s.chainConfig.ChainID.Uint64(), + resp.Proof, + *header.Header.GetPayloadCommitment(), + *header.Header.GetNsTable(), + []espressoTypes.Bytes{payload}, + resp.VidCommon, + ) if !namespaceOk { - log.Error("error validating namespace proof", "height", height) - return false + return fmt.Errorf("error validating namespace proof (height: %d)", height) } - // Verify the merkle tree proof snapshot, err := s.lightClientReader.FetchMerkleRoot(height, nil) if err != nil { - log.Warn("could not get the merkle root", "height", height, "err", err) - return false + return fmt.Errorf("%w (height: %d): %w", EspressoFetchMerkleRootErr, height, err) } if snapshot.Height <= height { - return false + return errors.New("snapshot height is less than or equal to transaction height") } nextHeader, err := s.espressoClient.FetchHeaderByHeight(ctx, snapshot.Height) if err != nil { - log.Warn("error fetching the snapshot header", "height", snapshot.Height, "err", err) - return false + return fmt.Errorf("error fetching the snapshot header (height: %d): %w", snapshot.Height, err) } proof, err := s.espressoClient.FetchBlockMerkleProof(ctx, snapshot.Height, height) if err != nil { - log.Warn("error fetching the block merkle proof", "height", height, "root height", snapshot.Height, "err", err) - return false + return fmt.Errorf("error fetching the block merkle proof (height: %d, root height: %d): %w", height, snapshot.Height, err) } blockMerkleTreeRoot := nextHeader.Header.GetBlockMerkleTreeRoot() jstHeader, err := json.Marshal(header) if err != nil { - log.Error("Failed to Marshal the header") - return false + return fmt.Errorf("failed to marshal the header: %w", err) } ok := espressocrypto.VerifyMerkleProof(proof.Proof, jstHeader, *blockMerkleTreeRoot, snapshot.Root) if !ok { - log.Error("error validating merkle proof", "height", height, "snapshot height", snapshot.Height) - return false + return fmt.Errorf("error validating merkle proof (height: %d, snapshot height: %d)", height, snapshot.Height) } // Validation completed. Update the database @@ -1365,30 +1357,22 @@ func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Co defer s.espressoTxnsStateInsertionMutex.Unlock() batch := s.db.NewBatch() - err = s.setEspressoSubmittedPos(batch, nil) - if err != nil { - log.Warn("failed to set the submitted pos to nil", "err", err) - return false + if err := s.setEspressoSubmittedPos(batch, nil); err != nil { + return fmt.Errorf("failed to set the submitted pos to nil: %w", err) } - err = s.setEspressoSubmittedHash(batch, nil) - if err != nil { - log.Warn("failed to set the submitted hash to nil", "err", err) - return false + if err := s.setEspressoSubmittedHash(batch, nil); err != nil { + return fmt.Errorf("failed to set the submitted hash to nil: %w", err) } lastConfirmedPos := submittedTxnPos[len(submittedTxnPos)-1] - err = s.setEspressoLastConfirmedPos(batch, &lastConfirmedPos) - if err != nil { - log.Warn("failed to set the last confirmed position", "err", err, "pos", lastConfirmedPos) - return false + if err := s.setEspressoLastConfirmedPos(batch, &lastConfirmedPos); err != nil { + return fmt.Errorf("failed to set the last confirmed position (pos: %d): %w", lastConfirmedPos, err) } - err = batch.Write() - if err != nil { - log.Error("failed to write to db", "err", err) - return false + if err := batch.Write(); err != nil { + return fmt.Errorf("failed to write to db: %w", err) } - log.Info("Finality message", "pos", submittedTxnPos, "tx", submittedTxHash.String()) - return true + + return nil } func (s *TransactionStreamer) getEspressoSubmittedPos() ([]arbutil.MessageIndex, error) { @@ -1639,13 +1623,13 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) ti msgs = append(msgs, *msg.Message) } } - payload, msgCnt := arbos.BuildHotShotPayload(&msgs) + payload, msgCnt := buildHotShotPayload(&msgs) if msgCnt == 0 { log.Error("failed to build the hotshot transaction: a large message has exceeded the size limit") return s.espressoTxnsPollingInterval } - log.Info("submitting transaction to espresso using sovereign sequencer") + log.Info("submitting transaction to hotshot for finalization") // Note: same key should not be used for two namespaces for this to work hash, err := s.espressoClient.SubmitTransaction(ctx, espressoTypes.Transaction{ @@ -1785,6 +1769,16 @@ func (s *TransactionStreamer) toggleEscapeHatch(ctx context.Context) error { return nil } +var espressoMerkleProofEphemeralErrorHandler = util.NewEphemeralErrorHandler(80*time.Minute, EspressoFetchMerkleRootErr.Error(), time.Hour) +var espressoTransactionEphemeralErrorHandler = util.NewEphemeralErrorHandler(3*time.Minute, EspressoFetchTransactionErr.Error(), time.Minute) + +func getLogLevel(err error) func(string, ...interface{}) { + logLevel := log.Error + logLevel = espressoMerkleProofEphemeralErrorHandler.LogLevel(err, logLevel) + logLevel = espressoTransactionEphemeralErrorHandler.LogLevel(err, logLevel) + return logLevel +} + func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct{}) time.Duration { retryRate := s.espressoTxnsPollingInterval * 50 enabledEspresso, err := s.isEspressoMode() @@ -1794,12 +1788,27 @@ func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct if enabledEspresso { err := s.toggleEscapeHatch(ctx) if err != nil { - log.Error("error checking escape hatch", "err", err) + if ctx.Err() != nil { + return 0 + } + logLevel := getLogLevel(err) + logLevel("error checking escape hatch, will retry", "err", err) + return retryRate + } + err = s.pollSubmittedTransactionForFinality(ctx) + if err != nil { + if ctx.Err() != nil { + return 0 + } + logLevel := getLogLevel(err) + logLevel("error polling finality", "err", err) return retryRate + } else { + espressoMerkleProofEphemeralErrorHandler.Reset() } - canSubmit := s.pollSubmittedTransactionForFinality(ctx) + shouldSubmit := s.shouldSubmitEspressoTransaction() - if canSubmit && shouldSubmit { + if shouldSubmit { return s.submitEspressoTransactions(ctx) } @@ -1823,3 +1832,23 @@ func (s *TransactionStreamer) Start(ctxIn context.Context) error { return stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.executeMessages, s.newMessageNotifier) } + +const ESPRESSO_TRANSACTION_SIZE_LIMIT int = 10 * 1024 + +func buildHotShotPayload(msgs *[]arbostypes.L1IncomingMessage) (espressoTypes.Bytes, int) { + payload := []byte{} + msgCnt := 0 + + sizeBuf := make([]byte, 8) + for _, msg := range *msgs { + if len(payload) >= ESPRESSO_TRANSACTION_SIZE_LIMIT { + break + } + msgByte := msg.L2msg + binary.BigEndian.PutUint64(sizeBuf, uint64(len(msgByte))) + payload = append(payload, sizeBuf...) + payload = append(payload, msgByte...) + msgCnt += 1 + } + return payload, msgCnt +} diff --git a/arbos/parse_l2.go b/arbos/parse_l2.go index 9d372ff803..06722e4063 100644 --- a/arbos/parse_l2.go +++ b/arbos/parse_l2.go @@ -2,14 +2,12 @@ package arbos import ( "bytes" - "encoding/binary" "errors" "fmt" "io" "math/big" "time" - espressoTypes "github.com/EspressoSystems/espresso-sequencer-go/types" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -19,8 +17,6 @@ import ( "github.com/offchainlabs/nitro/util/arbmath" ) -const ESPRESSO_TRANSACTION_SIZE_LIMIT int = 10 * 1024 - func ParseL2Transactions(msg *arbostypes.L1IncomingMessage, chainId *big.Int) (types.Transactions, error) { if len(msg.L2msg) > arbostypes.MaxL2MessageSize { // ignore the message if l2msg is too large @@ -397,21 +393,3 @@ func parseBatchPostingReportMessage(rd io.Reader, chainId *big.Int, msgBatchGasC // don't need to fill in the other fields, since they exist only to ensure uniqueness, and batchNum is already unique }), nil } - -func BuildHotShotPayload(msgs *[]arbostypes.L1IncomingMessage) (espressoTypes.Bytes, int) { - payload := []byte{} - msgCnt := 0 - - sizeBuf := make([]byte, 8) - for _, msg := range *msgs { - if len(payload) >= ESPRESSO_TRANSACTION_SIZE_LIMIT { - break - } - msgByte := msg.L2msg - binary.BigEndian.PutUint64(sizeBuf, uint64(len(msgByte))) - payload = append(payload, sizeBuf...) - payload = append(payload, msgByte...) - msgCnt += 1 - } - return payload, msgCnt -} diff --git a/execution/interface.go b/execution/interface.go index 718b497f7a..bfae28e9dd 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -3,9 +3,10 @@ package execution import ( "context" "errors" - "github.com/ethereum/go-ethereum/params" "testing" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/offchainlabs/nitro/arbos/arbostypes" @@ -13,9 +14,8 @@ import ( ) type MessageResult struct { - BlockHash common.Hash - SendRoot common.Hash - HotShotHeight uint64 + BlockHash common.Hash + SendRoot common.Hash } type RecordResult struct {