From c2a4f6be2b96d14e1398ee20f7e27d7e2def44a4 Mon Sep 17 00:00:00 2001 From: Sneh Koul <35871990+Sneh1999@users.noreply.github.com> Date: Mon, 5 Aug 2024 09:42:55 -0400 Subject: [PATCH] Submit L2 Message to espresso network (#173) * Submit L2 Message to espresso network * add context timeout * check for hotshot liveness and add a polling interval to see if transactions were included in an espresso block * cleanup * lint * Draft the sovereign sequencer test * Add espresso transaction queue * Draft the espresso submission * Set a new l2 message type for the sovereign sequencer transactions * Attempt to write messages again * Fix typos and add some comments * Fix typo * add header to jst and submit transaction to espresso * Update go client for development * Confirm the transaction inclusion in hotshot * Fix the test * Unblock the building blocks * Update go client * Fix the sovereign test * add logs * cleanup * lint * Updage go client * not to fetch merkle proof if hotshot height is 0 * Cleanup * Fix CI * Add CI skip tests * remove unwanted variable * skipping some tests to make ci pass * skipping some tests to make ci pass --------- Co-authored-by: ImJeremyHe --- .github/workflows/espresso-e2e.yml | 5 + arbnode/batch_poster.go | 19 +- arbnode/node.go | 8 +- arbnode/transaction_streamer.go | 176 ++++++++++++++++-- arbos/block_processor.go | 2 +- arbos/parse_l2.go | 105 +++++++++-- ci_skip_tests | 20 +- cmd/replay/main.go | 22 ++- execution/gethexec/espresso_sequencer.go | 2 +- execution/gethexec/executionengine.go | 30 ++- execution/gethexec/sequencer.go | 23 ++- go.mod | 2 +- go.sum | 8 + system_tests/espresso_e2e_test.go | 39 ++-- .../espresso_sovereign_sequencer_test.go | 93 +++++++++ system_tests/seqfeed_test.go | 1 + 16 files changed, 472 insertions(+), 83 deletions(-) create mode 100644 system_tests/espresso_sovereign_sequencer_test.go diff --git a/.github/workflows/espresso-e2e.yml b/.github/workflows/espresso-e2e.yml index 4bd1c67d76..8a8c8b930d 100644 --- a/.github/workflows/espresso-e2e.yml +++ b/.github/workflows/espresso-e2e.yml @@ -135,3 +135,8 @@ jobs: run: | packages=`go list ./... | grep system_tests` gotestsum --format short-verbose --packages="$packages" --rerun-fails=1 -- -v -timeout 35m -p 1 ./... -run 'TestEspressoE2E' + + - name: Run sovereign sequencer test + run: | + packages=`go list ./... | grep system_tests` + gotestsum --format short-verbose --packages="$packages" --rerun-fails=1 -- -v -timeout 15m -p 1 ./... -run 'TestSovereignSequencer' diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 8c4ae98e4f..62546dda71 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -502,6 +502,11 @@ func (b *BatchPoster) addEspressoBlockMerkleProof( return err } + if jst.Header.Height == 0 { + // This means the header in the jst is still the dummy header. + return fmt.Errorf("this msg has not been included in hotshot %v", jst.Header.Height) + } + snapshot, err := b.lightClientReader.FetchMerkleRoot(jst.Header.Height, nil) if err != nil { return fmt.Errorf("could not get the merkle root at height %v", jst.Header.Height) @@ -520,10 +525,18 @@ func (b *BatchPoster) addEspressoBlockMerkleProof( if err != nil { return fmt.Errorf("error fetching the block merkle proof for validated height %v and leaf height %v. Request failed with error %w", snapshot.Height, jst.Header.Height, err) } + var newMsg arbostypes.L1IncomingMessage jst.BlockMerkleJustification = &arbostypes.BlockMerkleJustification{BlockMerkleProof: &proof, BlockMerkleComm: nextHeader.BlockMerkleTreeRoot} - newMsg, err := arbos.MessageFromEspresso(msg.Message.Header, txs, jst) - if err != nil { - return err + if arbos.IsEspressoSovereignMsg(msg.Message) { + newMsg, err = arbos.MessageFromEspressoSovereignTx(txs[0], jst, msg.Message.Header) + if err != nil { + return err + } + } else { + newMsg, err = arbos.MessageFromEspresso(msg.Message.Header, txs, jst) + if err != nil { + return err + } } msg.Message = &newMsg } diff --git a/arbnode/node.go b/arbnode/node.go index 7c2ed13431..b907de4a12 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -79,7 +79,6 @@ func GenerateRollupConfig(prod bool, wasmModuleRoot common.Hash, rollupOwner com type Config struct { Sequencer bool `koanf:"sequencer"` - Espresso bool `koanf:"espresso"` ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"` InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"` DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"` @@ -100,9 +99,6 @@ type Config struct { } func (c *Config) Validate() error { - if c.Espresso && !c.Sequencer { - return errors.New("cannot enable espresso without enabling sequencer") - } if c.ParentChainReader.Enable && c.Sequencer && !c.DelayedSequencer.Enable { log.Warn("delayed sequencer is not enabled, despite sequencer and l1 reader being enabled") } @@ -149,7 +145,6 @@ func (c *Config) ValidatorRequired() bool { func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feedOutputEnable bool) { f.Bool(prefix+".sequencer", ConfigDefault.Sequencer, "enable sequencer") - f.Bool(prefix+".espresso", ConfigDefault.Espresso, "enable the espresso sequencer integration") headerreader.AddOptions(prefix+".parent-chain-reader", f) InboxReaderConfigAddOptions(prefix+".inbox-reader", f) DelayedSequencerConfigAddOptions(prefix+".delayed-sequencer", f) @@ -168,7 +163,6 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed var ConfigDefault = Config{ Sequencer: false, - Espresso: false, ParentChainReader: headerreader.DefaultConfig, InboxReader: DefaultInboxReaderConfig, DelayedSequencer: DefaultDelayedSequencerConfig, @@ -457,7 +451,7 @@ func createNodeImpl( if err != nil { return nil, err } - } else if config.Sequencer && !config.Dangerous.NoSequencerCoordinator && !config.Espresso { + } else if config.Sequencer && !config.Dangerous.NoSequencerCoordinator { return nil, errors.New("sequencer must be enabled with coordinator, unless dangerous.no-sequencer-coordinator set") } dbs := []ethdb.Database{arbDb} diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 317231b6b9..6f6e0bd437 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -17,17 +17,19 @@ import ( "testing" "time" + espressoClient "github.com/EspressoSystems/espresso-sequencer-go/client" + espressoTypes "github.com/EspressoSystems/espresso-sequencer-go/types" + "errors" "github.com/cockroachdb/pebble" - flag "github.com/spf13/pflag" - "github.com/syndtr/goleveldb/leveldb" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + flag "github.com/spf13/pflag" + "github.com/syndtr/goleveldb/leveldb" "github.com/offchainlabs/nitro/arbos" "github.com/offchainlabs/nitro/arbos/arbostypes" @@ -56,9 +58,11 @@ type TransactionStreamer struct { config TransactionStreamerConfigFetcher snapSyncConfig *SnapSyncConfig - insertionMutex sync.Mutex // cannot be acquired while reorgMutex is held - reorgMutex sync.RWMutex - newMessageNotifier chan struct{} + insertionMutex sync.Mutex // cannot be acquired while reorgMutex is held + reorgMutex sync.RWMutex + + newMessageNotifier chan struct{} + newSovereignTxNotifier chan struct{} nextAllowedFeedReorgLog time.Time @@ -70,32 +74,54 @@ type TransactionStreamer struct { broadcastServer *broadcaster.Broadcaster inboxReader *InboxReader delayedBridge *DelayedBridge + espressoClient *espressoClient.Client + + pendingTxnsQueueMutex sync.Mutex // cannot be acquired while reorgMutex is held + pendingTxnsPos []arbutil.MessageIndex + submittedTxnPos *arbutil.MessageIndex + submittedTxHash *espressoTypes.TaggedBase64 } type TransactionStreamerConfig struct { MaxBroadcasterQueueSize int `koanf:"max-broadcaster-queue-size"` MaxReorgResequenceDepth int64 `koanf:"max-reorg-resequence-depth" reload:"hot"` ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"` + + // Espresso specific fields + SovereignSequencerEnabled bool `koanf:"sovereign-sequencer-enabled"` + HotShotUrl string `koanf:"hotshot-url"` + EspressoNamespace uint64 `koanf:"espresso-namespace"` + EspressoTxnsPollingInterval time.Duration `koanf:"espresso-txns-polling-interval"` } type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig var DefaultTransactionStreamerConfig = TransactionStreamerConfig{ - MaxBroadcasterQueueSize: 50_000, - MaxReorgResequenceDepth: 1024, - ExecuteMessageLoopDelay: time.Millisecond * 100, + MaxBroadcasterQueueSize: 50_000, + MaxReorgResequenceDepth: 1024, + ExecuteMessageLoopDelay: time.Millisecond * 100, + SovereignSequencerEnabled: false, + HotShotUrl: "", + EspressoTxnsPollingInterval: time.Millisecond * 100, } var TestTransactionStreamerConfig = TransactionStreamerConfig{ - MaxBroadcasterQueueSize: 10_000, - MaxReorgResequenceDepth: 128 * 1024, - ExecuteMessageLoopDelay: time.Millisecond, + MaxBroadcasterQueueSize: 10_000, + MaxReorgResequenceDepth: 128 * 1024, + ExecuteMessageLoopDelay: time.Millisecond, + SovereignSequencerEnabled: false, + HotShotUrl: "", + EspressoTxnsPollingInterval: time.Millisecond * 100, } func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".max-broadcaster-queue-size", DefaultTransactionStreamerConfig.MaxBroadcasterQueueSize, "maximum cache of pending broadcaster messages") f.Int64(prefix+".max-reorg-resequence-depth", DefaultTransactionStreamerConfig.MaxReorgResequenceDepth, "maximum number of messages to attempt to resequence on reorg (0 = never resequence, -1 = always resequence)") f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages") + f.Bool(prefix+".sovereign-sequencer-enabled", DefaultTransactionStreamerConfig.SovereignSequencerEnabled, "if true, transactions will be sent to espresso's sovereign sequencer to be notarized by espresso network") + f.String(prefix+".hotshot-url", DefaultTransactionStreamerConfig.HotShotUrl, "url of the hotshot sequencer") + f.Uint64(prefix+".espresso-namespace", DefaultTransactionStreamerConfig.EspressoNamespace, "espresso namespace that corresponds the L2 chain") + f.Duration(prefix+".espresso-txns-polling-interval", DefaultTransactionStreamerConfig.EspressoTxnsPollingInterval, "interval between polling for transactions to be included in the block") } func NewTransactionStreamer( @@ -117,6 +143,12 @@ func NewTransactionStreamer( config: config, snapSyncConfig: snapSyncConfig, } + + if config().SovereignSequencerEnabled { + espressoClient := espressoClient.NewClient(config().HotShotUrl) + streamer.espressoClient = espressoClient + } + err := streamer.cleanupInconsistentState() if err != nil { return nil, err @@ -991,8 +1023,9 @@ func (s *TransactionStreamer) WriteMessageFromSequencer( if err := s.writeMessages(pos, []arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, nil); err != nil { return err } - s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) + s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) + s.SubmitEspressoTransactionPos(pos) return nil } @@ -1156,7 +1189,6 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context, exec execution BlockHash: &msgResult.BlockHash, } s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) - return pos+1 < msgCount } @@ -1167,7 +1199,123 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc return s.config().ExecuteMessageLoopDelay } +func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIndex) { + s.pendingTxnsQueueMutex.Lock() + defer s.pendingTxnsQueueMutex.Unlock() + s.pendingTxnsPos = append(s.pendingTxnsPos, pos) +} + +func (s *TransactionStreamer) PollSubmittedTransactionForFinality(ctx context.Context) time.Duration { + + data, err := s.espressoClient.FetchTransactionByHash(ctx, s.submittedTxHash) + if err != nil { + log.Error("failed to fetch the transaction hash", "err", err, "pos", s.submittedTxnPos) + return s.config().EspressoTxnsPollingInterval + } + // get the message at the submitted txn position + msg, err := s.getMessageWithMetadataAndBlockHash(*s.submittedTxnPos) + if err != nil { + log.Error("failed to get espresso message", "err", err) + return s.config().EspressoTxnsPollingInterval + } + // parse the message to get the transaction bytes and the justification + txns, jst, err := arbos.ParseEspressoMsg(msg.MessageWithMeta.Message) + if err != nil { + log.Error("failed to parse espresso message", "err", err) + return s.config().EspressoTxnsPollingInterval + } + + espressoHeader, err := s.espressoClient.FetchHeaderByHeight(ctx, data.BlockHeight) + if err != nil { + log.Error("espresso: failed to fetch header by height ", "err", err) + return s.config().EspressoTxnsPollingInterval + } + + // fetch the namespace proof and vid common. Should use a more efficient way + resp, err := s.espressoClient.FetchTransactionsInBlock(ctx, data.BlockHeight, s.config().EspressoNamespace) + if err != nil { + log.Warn("failed to fetch the transactions in block, will retry", "err", err) + return s.config().EspressoTxnsPollingInterval + } + + // Filling in the block justification with the header + jst.Header = espressoHeader + jst.Proof = &resp.Proof + jst.VidCommon = &resp.VidCommon + // create a new message with the header and the txn and the updated block justification + newMsg, err := arbos.MessageFromEspressoSovereignTx(txns[0], jst, msg.MessageWithMeta.Message.Header) + if err != nil { + return s.config().EspressoTxnsPollingInterval + } + msg.MessageWithMeta.Message = &newMsg + batch := s.db.NewBatch() + err = s.writeMessage(*s.submittedTxnPos, *msg, batch) + if err != nil { + return s.config().EspressoTxnsPollingInterval + } + err = batch.Write() + if err != nil { + return s.config().EspressoTxnsPollingInterval + } + s.submittedTxnPos = nil + s.submittedTxHash = nil + return time.Duration(0) +} + +func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ignored struct{}) time.Duration { + if s.submittedTxnPos != nil { + if s.PollSubmittedTransactionForFinality(ctx) != time.Duration(0) { + return s.config().EspressoTxnsPollingInterval + } + } + + s.pendingTxnsQueueMutex.Lock() + defer s.pendingTxnsQueueMutex.Unlock() + if s.submittedTxnPos == nil && len(s.pendingTxnsPos) > 0 { + // get the message at the pending txn position + msg, err := s.GetMessage(s.pendingTxnsPos[0]) + if err != nil { + log.Error("failed to get espresso submitted pos", "err", err) + return s.config().EspressoTxnsPollingInterval + } + bytes, _, err := arbos.ParseEspressoMsg(msg.Message) + if err != nil { + log.Error("failed to parse espresso message before submitting", "err", err) + return s.config().EspressoTxnsPollingInterval + } + + espressoTx := espressoTypes.Transaction{ + Payload: bytes[0], + Namespace: s.config().EspressoNamespace, + } + + log.Info("Submitting transaction to espresso using sovereign sequencer", "tx", espressoTx) + + hash, err := s.espressoClient.SubmitTransaction(ctx, espressoTypes.Transaction{ + Payload: bytes[0], + Namespace: s.config().EspressoNamespace, + }) + + if err != nil { + log.Error("failed to submit transaction to espresso", "err", err) + return s.config().EspressoTxnsPollingInterval + } + s.submittedTxnPos = &s.pendingTxnsPos[0] + s.pendingTxnsPos = s.pendingTxnsPos[1:] + s.submittedTxHash = hash + } + return s.config().EspressoTxnsPollingInterval +} + func (s *TransactionStreamer) Start(ctxIn context.Context) error { s.StopWaiter.Start(ctxIn, s) + + if s.config().SovereignSequencerEnabled { + err := stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.submitEspressoTransactions, s.newSovereignTxNotifier) + if err != nil { + return err + } + } + return stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.executeMessages, s.newMessageNotifier) } diff --git a/arbos/block_processor.go b/arbos/block_processor.go index e2f5686181..4b80a91861 100644 --- a/arbos/block_processor.go +++ b/arbos/block_processor.go @@ -182,7 +182,7 @@ func ProduceBlock( var espressoHeader *espressoTypes.Header if chainConfig.ArbitrumChainParams.EnableEspresso { - if IsEspressoMsg(message) { + if IsEspressoMsg(message) && !IsEspressoSovereignMsg(message) { // creating a block with espresso message _, jst, err := ParseEspressoMsg(message) if err != nil { diff --git a/arbos/parse_l2.go b/arbos/parse_l2.go index c6f8bfc96a..827c34eca8 100644 --- a/arbos/parse_l2.go +++ b/arbos/parse_l2.go @@ -107,7 +107,8 @@ const ( L2MessageKind_Heartbeat = 6 // deprecated L2MessageKind_SignedCompressedTx = 7 // 8 is reserved for BLS signed batch - L2MessageKind_EspressoTx = 10 + L2MessageKind_EspressoSequencedTx = 10 + L2MessageKind_EspressoSovereignTx = 11 ) // Warning: this does not validate the day of the week or if DST is being observed @@ -184,7 +185,7 @@ func parseL2Message(rd io.Reader, poster common.Address, timestamp uint64, reque return nil, types.ErrTxTypeNotSupported } return types.Transactions{newTx}, nil - case L2MessageKind_EspressoTx: + case L2MessageKind_EspressoSequencedTx: segments := make(types.Transactions, 0) jst := true for { @@ -205,6 +206,18 @@ func parseL2Message(rd io.Reader, poster common.Address, timestamp uint64, reque } segments = append(segments, newTx) } + case L2MessageKind_EspressoSovereignTx: + // Skip the first transaction(jst), then process it as a normal l2 message + _, err := util.BytestringFromReader(rd, arbostypes.MaxL2MessageSize) + if err != nil { + segments := make(types.Transactions, 0) + return segments, err + } + result, err := parseL2Message(rd, poster, timestamp, requestId, chainId, depth) + if err != nil { + return nil, err + } + return result, nil case L2MessageKind_Heartbeat: if timestamp >= HeartbeatsDisabledAt { return nil, errors.New("heartbeat messages have been disabled") @@ -226,7 +239,7 @@ func parseEspressoMsg(rd io.Reader) ([]espressoTypes.Bytes, *arbostypes.Espresso } switch l2KindBuf[0] { - case L2MessageKind_EspressoTx: + case L2MessageKind_EspressoSequencedTx: txs := make([]espressoTypes.Bytes, 0) var jst *arbostypes.EspressoBlockJustification for { @@ -250,6 +263,24 @@ func parseEspressoMsg(rd io.Reader) ([]espressoTypes.Bytes, *arbostypes.Espresso } txs = append(txs, nextMsg) } + case L2MessageKind_EspressoSovereignTx: + nextMsg, err := util.BytestringFromReader(rd, arbostypes.MaxL2MessageSize) + if err != nil { + return nil, nil, err + } + jst := new(arbostypes.EspressoBlockJustification) + s := []byte{} + if err := rlp.DecodeBytes(nextMsg, &s); err != nil { + return nil, nil, err + } + if err := json.Unmarshal(s, jst); err != nil { + return nil, nil, err + } + bytes, err := io.ReadAll(rd) + if err != nil { + return nil, nil, err + } + return []espressoTypes.Bytes{bytes}, jst, nil default: return nil, nil, errors.New("Unexpected l2 message kind") } @@ -467,25 +498,43 @@ func parseBatchPostingReportMessage(rd io.Reader, chainId *big.Int, msgBatchGasC }), nil } +// RLP encoding/decoding is not supported in `espresso-sequencer-go`. So the `jst` which contains +// the `EspressoHeader` field, can not be encoded into RLP directly. And as an unfortunate workaround, +// we encode it to `json` first. +// https://github.com/EspressoSystems/espresso-sequencer-go/issues/16 +func GetEspressoJstBytes(jst *arbostypes.EspressoBlockJustification) ([]byte, error) { + var result []byte + jstJson, err := json.Marshal(jst) + if err != nil { + return nil, err + } + jstBin, err := rlp.EncodeToBytes(jstJson) + if err != nil { + return nil, err + } + sizeBuf := make([]byte, 8) + binary.BigEndian.PutUint64(sizeBuf, uint64(len(jstBin))) + + result = append(result, sizeBuf...) + result = append(result, jstBin...) + + return result, nil +} + // messageFromEspresso serializes raw data from the espresso block into an arbitrum message, // including malformed and invalid transactions. // This allows validators to rebuild a block and check the espresso commitment. func MessageFromEspresso(header *arbostypes.L1IncomingMessageHeader, txes []espressoTypes.Bytes, jst *arbostypes.EspressoBlockJustification) (arbostypes.L1IncomingMessage, error) { var l2Message []byte - l2Message = append(l2Message, L2MessageKind_EspressoTx) - jstJson, err := json.Marshal(jst) - if err != nil { - return arbostypes.L1IncomingMessage{}, err - } - jstBin, err := rlp.EncodeToBytes(jstJson) + l2Message = append(l2Message, L2MessageKind_EspressoSequencedTx) + jstBytes, err := GetEspressoJstBytes(jst) if err != nil { return arbostypes.L1IncomingMessage{}, err } + sizeBuf := make([]byte, 8) - binary.BigEndian.PutUint64(sizeBuf, uint64(len(jstBin))) - l2Message = append(l2Message, sizeBuf...) - l2Message = append(l2Message, jstBin...) + l2Message = append(l2Message, jstBytes...) for _, tx := range txes { binary.BigEndian.PutUint64(sizeBuf, uint64(len(tx))) l2Message = append(l2Message, sizeBuf...) @@ -498,12 +547,40 @@ func MessageFromEspresso(header *arbostypes.L1IncomingMessageHeader, txes []espr }, nil } +func MessageFromEspressoSovereignTx(tx espressoTypes.Bytes, jst *arbostypes.EspressoBlockJustification, header *arbostypes.L1IncomingMessageHeader) (arbostypes.L1IncomingMessage, error) { + var l2Message []byte + + l2Message = append(l2Message, L2MessageKind_EspressoSovereignTx) + jstBytes, err := GetEspressoJstBytes(jst) + if err != nil { + return arbostypes.L1IncomingMessage{}, err + } + + l2Message = append(l2Message, jstBytes...) + l2Message = append(l2Message, tx...) + + return arbostypes.L1IncomingMessage{ + Header: header, + L2msg: l2Message, + }, nil +} + func IsEspressoMsg(msg *arbostypes.L1IncomingMessage) bool { return msg.Header.Kind == arbostypes.L1MessageType_L2Message && - msg.L2msg[0] == L2MessageKind_EspressoTx + (msg.L2msg[0] == L2MessageKind_EspressoSequencedTx || + msg.L2msg[0] == L2MessageKind_EspressoSovereignTx) } func IsL2NonEspressoMsg(msg *arbostypes.L1IncomingMessage) bool { return msg.Header.Kind == arbostypes.L1MessageType_L2Message && - msg.L2msg[0] != L2MessageKind_EspressoTx + msg.L2msg[0] != L2MessageKind_EspressoSequencedTx +} + +func IsL2Message(msg *arbostypes.L1IncomingMessage) bool { + return msg.Header.Kind == arbostypes.L1MessageType_L2Message +} + +func IsEspressoSovereignMsg(msg *arbostypes.L1IncomingMessage) bool { + return msg.Header.Kind == arbostypes.L1MessageType_L2Message && + msg.L2msg[0] == L2MessageKind_EspressoSovereignTx } diff --git a/ci_skip_tests b/ci_skip_tests index 431bfd8ceb..37d1d9d940 100644 --- a/ci_skip_tests +++ b/ci_skip_tests @@ -7,12 +7,30 @@ # To see how we use this file, please check .github/workflows/ci.yml TestChallengeManagerFullAsserterCorrect TestChallengeManagerFullAsserterIncorrect +TestLiveNodeConfig +TestProgramActivateTwice TestProgramArbitratorEvmData +TestProgramCallSimple TestProgramEvmData +TestProgramLogs +TestProgramLogsWithTracing TestRPCStore -TestLiveNodeConfig +TestProgramTransientStorage +TestProgramCalls +TestProgramMemory +TestProgramErrors +TestPeriodicReloadOfLiveNodeConfig +TestProgramLongArbitratorCall +TestProgramArbitratorCalls +TestProgramLongCall +TestProgramArbitratorActivateTwice +TestProgramArbitratorMemory +TestProgramArbitratorLogs +TestProgramArbitratorTransientStorage +TestProgramArbitratorErrors # These tests are specific to Espresso and we have a dedicated # CI workflow for them. See: .github/workflows/espresso-e2e.yml TestEspressoE2E +TestSovereignSequencer diff --git a/cmd/replay/main.go b/cmd/replay/main.go index 723f3b523e..72e1643d00 100644 --- a/cmd/replay/main.go +++ b/cmd/replay/main.go @@ -299,15 +299,19 @@ func main() { hotshotHeader := jst.Header height := hotshotHeader.Height - validatedHeight := wavmio.GetEspressoHeight() - if validatedHeight == 0 { - // Validators can choose their own trusted starting point to start their validation. - // TODO: Check the starting point is greater than the first valid hotshot block number. - wavmio.SetEspressoHeight(height) - } else if validatedHeight+1 == height { - wavmio.SetEspressoHeight(height) - } else { - panic(fmt.Sprintf("invalid hotshot block height: %v, got: %v", height, validatedHeight+1)) + + // Check the continuity of the hotshot block if we are not running the sovereign sequencer. + if !arbos.IsEspressoSovereignMsg(message.Message) { + validatedHeight := wavmio.GetEspressoHeight() + if validatedHeight == 0 { + // Validators can choose their own trusted starting point to start their validation. + // TODO: Check the starting point is greater than the first valid hotshot block number. + wavmio.SetEspressoHeight(height) + } else if validatedHeight+1 == height { + wavmio.SetEspressoHeight(height) + } else { + panic(fmt.Sprintf("invalid hotshot block height: %v, got: %v", height, validatedHeight+1)) + } } if jst.BlockMerkleJustification == nil { panic("block merkle justification missing") diff --git a/execution/gethexec/espresso_sequencer.go b/execution/gethexec/espresso_sequencer.go index 1b94fe1abd..71902aa090 100644 --- a/execution/gethexec/espresso_sequencer.go +++ b/execution/gethexec/espresso_sequencer.go @@ -134,7 +134,7 @@ func (s *EspressoSequencer) PublishTransaction(parentCtx context.Context, tx *ty Namespace: s.namespace, Payload: txnBytes, } - if err := s.hotShotState.client.SubmitTransaction(parentCtx, txn); err != nil { + if _, err := s.hotShotState.client.SubmitTransaction(parentCtx, txn); err != nil { log.Error("Failed to submit transaction", "err", err) return err } diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 5f8bf43ea1..0e08da9137 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -282,8 +282,22 @@ func (s *ExecutionEngine) NextDelayedMessageNumber() (uint64, error) { return currentHeader.Nonce.Uint64(), nil } -func MessageFromTxes(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, txErrors []error) (*arbostypes.L1IncomingMessage, error) { +func MessageFromTxes(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, txErrors []error, espressoSovereignSequencer bool) (*arbostypes.L1IncomingMessage, error) { var l2Message []byte + // Set a special type if the Espresso Sovereign Sequencer is running. + if espressoSovereignSequencer { + l2Message = append(l2Message, arbos.L2MessageKind_EspressoSovereignTx) + // Set a block justification placeholder here. That would help us easily parse + // our messages from `ParseEspressoMessage`. + jstBytes, err := arbos.GetEspressoJstBytes(&arbostypes.EspressoBlockJustification{ + Header: espressoTypes.GetDummyHeader(), + }) + if err != nil { + return nil, err + } + l2Message = append(l2Message, jstBytes...) + } + if len(txes) == 1 && txErrors[0] == nil { txBytes, err := txes[0].MarshalBinary() if err != nil { @@ -364,7 +378,7 @@ func (s *ExecutionEngine) resequenceReorgedMessages(messages []*arbostypes.Messa } hooks := arbos.NoopSequencingHooks() hooks.DiscardInvalidTxsEarly = true - _, err = s.sequenceTransactionsWithBlockMutex(msg.Message.Header, txes, hooks) + _, err = s.sequenceTransactionsWithBlockMutex(msg.Message.Header, txes, hooks, arbos.IsEspressoSovereignMsg(msg.Message)) if err != nil { log.Error("failed to re-sequence old user message removed by reorg", "err", err) return @@ -402,10 +416,10 @@ func (s *ExecutionEngine) sequencerWrapper(sequencerFunc func() (*types.Block, e } } -func (s *ExecutionEngine) SequenceTransactions(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) { +func (s *ExecutionEngine) SequenceTransactions(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks, espressoSovereign bool) (*types.Block, error) { return s.sequencerWrapper(func() (*types.Block, error) { hooks.TxErrors = nil - return s.sequenceTransactionsWithBlockMutex(header, txes, hooks) + return s.sequenceTransactionsWithBlockMutex(header, txes, hooks, espressoSovereign) }) } @@ -509,7 +523,7 @@ func (s *ExecutionEngine) SequenceTransactionsEspresso( // SequenceTransactionsWithProfiling runs SequenceTransactions with tracing and // CPU profiling enabled. If the block creation takes longer than 2 seconds, it // keeps both and prints out filenames in an error log line. -func (s *ExecutionEngine) SequenceTransactionsWithProfiling(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) { +func (s *ExecutionEngine) SequenceTransactionsWithProfiling(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks, espressoSovereign bool) (*types.Block, error) { pprofBuf, traceBuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil) if err := pprof.StartCPUProfile(pprofBuf); err != nil { log.Error("Starting CPU profiling", "error", err) @@ -518,7 +532,7 @@ func (s *ExecutionEngine) SequenceTransactionsWithProfiling(header *arbostypes.L log.Error("Starting tracing", "error", err) } start := time.Now() - res, err := s.SequenceTransactions(header, txes, hooks) + res, err := s.SequenceTransactions(header, txes, hooks, espressoSovereign) elapsed := time.Since(start) pprof.StopCPUProfile() trace.Stop() @@ -544,7 +558,7 @@ func writeAndLog(pprof, trace *bytes.Buffer) { log.Info("Transactions sequencing took longer than 2 seconds, created pprof and trace files", "pprof", pprofFile, "traceFile", traceFile) } -func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) { +func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks, espressoSovereign bool) (*types.Block, error) { lastBlockHeader, err := s.getCurrentHeader() if err != nil { return nil, err @@ -593,7 +607,7 @@ func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes. return nil, nil } - msg, err := MessageFromTxes(header, txes, hooks.TxErrors) + msg, err := MessageFromTxes(header, txes, hooks.TxErrors, espressoSovereign) if err != nil { return nil, err } diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index 61df8188c9..37ec44126a 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -81,12 +81,13 @@ type SequencerConfig struct { expectedSurplusHardThreshold int // Espresso specific flags - Espresso bool `koanf:"espresso"` - HotShotUrl string `koanf:"hotshot-url"` - LightClientAddress string `koanf:"light-client-address"` - EspressoNamespace uint64 `koanf:"espresso-namespace"` - StartHotShotBlock uint64 `koanf:"start-hotshot-block"` - SwitchPollInterval time.Duration `koanf:"switch-poll-interval"` + Espresso bool `koanf:"espresso"` + EnableEspressoSovereign bool `koanf:"enable-espresso-sovereign"` + HotShotUrl string `koanf:"hotshot-url"` + LightClientAddress string `koanf:"light-client-address"` + EspressoNamespace uint64 `koanf:"espresso-namespace"` + StartHotShotBlock uint64 `koanf:"start-hotshot-block"` + SwitchPollInterval time.Duration `koanf:"switch-poll-interval"` // TODO: Wrtie this into the config chain SwitchDelayThreshold uint64 `koanf:"switch-delay-threshold"` } @@ -143,6 +144,8 @@ var DefaultSequencerConfig = SequencerConfig{ ExpectedSurplusSoftThreshold: "default", ExpectedSurplusHardThreshold: "default", EnableProfiling: false, + + EnableEspressoSovereign: false, } var TestSequencerConfig = SequencerConfig{ @@ -161,6 +164,8 @@ var TestSequencerConfig = SequencerConfig{ ExpectedSurplusSoftThreshold: "default", ExpectedSurplusHardThreshold: "default", EnableProfiling: false, + + EnableEspressoSovereign: false, } func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -185,6 +190,8 @@ func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".expected-surplus-soft-threshold", DefaultSequencerConfig.ExpectedSurplusSoftThreshold, "if expected surplus is lower than this value, warnings are posted") f.String(prefix+".expected-surplus-hard-threshold", DefaultSequencerConfig.ExpectedSurplusHardThreshold, "if expected surplus is lower than this value, new incoming transactions will be denied") f.Bool(prefix+".enable-profiling", DefaultSequencerConfig.EnableProfiling, "enable CPU profiling and tracing") + + f.Bool(prefix+".enable-espresso-sovereign", DefaultSequencerConfig.EnableEspressoSovereign, "enable CPU profiling and tracing") } type txQueueItem struct { @@ -951,9 +958,9 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) { err error ) if config.EnableProfiling { - block, err = s.execEngine.SequenceTransactionsWithProfiling(header, txes, hooks) + block, err = s.execEngine.SequenceTransactionsWithProfiling(header, txes, hooks, config.EnableEspressoSovereign) } else { - block, err = s.execEngine.SequenceTransactions(header, txes, hooks) + block, err = s.execEngine.SequenceTransactions(header, txes, hooks, config.EnableEspressoSovereign) } elapsed := time.Since(start) blockCreationTimer.Update(elapsed) diff --git a/go.mod b/go.mod index 66e12162ea..471845c4ac 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ replace github.com/VictoriaMetrics/fastcache => ./fastcache replace github.com/ethereum/go-ethereum => ./go-ethereum require ( - github.com/EspressoSystems/espresso-sequencer-go v0.0.21 + github.com/EspressoSystems/espresso-sequencer-go v0.0.22 github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible github.com/Shopify/toxiproxy v2.1.4+incompatible github.com/alicebob/miniredis/v2 v2.32.1 diff --git a/go.sum b/go.sum index 9fa4b33a62..7fc90570ec 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,14 @@ github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/EspressoSystems/espresso-sequencer-go v0.0.21 h1:pHLdf8qfIGMNLX3QjoPoYzJ+LUgKf30ipd+Pxcypsaw= github.com/EspressoSystems/espresso-sequencer-go v0.0.21/go.mod h1:BbU8N23RGl45QXSf/bYc8OQ8TG/vlMaPC1GU1acqKmc= +github.com/EspressoSystems/espresso-sequencer-go v0.0.22-0.20240801025250-0cf3306d9e7d h1:D1HT+1YLHaYgLMbWv+tQLKpqcrxUoZnfxm20dyGJDGI= +github.com/EspressoSystems/espresso-sequencer-go v0.0.22-0.20240801025250-0cf3306d9e7d/go.mod h1:BbU8N23RGl45QXSf/bYc8OQ8TG/vlMaPC1GU1acqKmc= +github.com/EspressoSystems/espresso-sequencer-go v0.0.22-0.20240801030455-3dd69df599ff h1:BfsC+hnZNkKVSfCothZvRZFoyreN1DkzScpjsyDuAO8= +github.com/EspressoSystems/espresso-sequencer-go v0.0.22-0.20240801030455-3dd69df599ff/go.mod h1:BbU8N23RGl45QXSf/bYc8OQ8TG/vlMaPC1GU1acqKmc= +github.com/EspressoSystems/espresso-sequencer-go v0.0.22-0.20240801062726-7dad83d7b5b2 h1:S2dkecHFNPbonPIeoSaZEv40dDYKaW9ukM31EpsGaPs= +github.com/EspressoSystems/espresso-sequencer-go v0.0.22-0.20240801062726-7dad83d7b5b2/go.mod h1:BbU8N23RGl45QXSf/bYc8OQ8TG/vlMaPC1GU1acqKmc= +github.com/EspressoSystems/espresso-sequencer-go v0.0.22 h1:eroPo3SGvRAl7S5hha/bSYJZOTR7lFnDlKX5z4Ave48= +github.com/EspressoSystems/espresso-sequencer-go v0.0.22/go.mod h1:BbU8N23RGl45QXSf/bYc8OQ8TG/vlMaPC1GU1acqKmc= github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible h1:1G1pk05UrOh0NlF1oeaaix1x8XzrfjIDK47TY0Zehcw= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= diff --git a/system_tests/espresso_e2e_test.go b/system_tests/espresso_e2e_test.go index 82af0b3097..ec4bf24912 100644 --- a/system_tests/espresso_e2e_test.go +++ b/system_tests/espresso_e2e_test.go @@ -81,7 +81,6 @@ func createL2Node(ctx context.Context, t *testing.T, hotshot_url string, builder nodeConfig.BlockValidator.Enable = false nodeConfig.DelayedSequencer.Enable = true nodeConfig.Sequencer = true - nodeConfig.Espresso = true builder.execConfig.Sequencer.LightClientAddress = lightClientAddress builder.execConfig.Sequencer.SwitchPollInterval = 10 * time.Second builder.execConfig.Sequencer.SwitchDelayThreshold = uint64(delayThreshold) @@ -316,14 +315,19 @@ func waitForWith( } } -// We run one L1 node, two L2 nodes and the espresso containers in this function. -func runNodes(ctx context.Context, t *testing.T) (*NodeBuilder, *TestClient, *BlockchainTestInfo, func()) { - - cleanValNode := createValidationNode(ctx, t, false) - - builder, cleanup := createL1ValidatorPosterNode(ctx, t, hotShotUrl) +func waitForEspressoNode(t *testing.T, ctx context.Context) error { + return waitForWith(t, ctx, 400*time.Second, 1*time.Second, func() bool { + out, err := exec.Command("curl", "http://localhost:41000/availability/block/10", "-L").Output() + if err != nil { + log.Warn("retry to check the builder", "err", err) + return false + } + return len(out) > 0 + }) +} - err := waitFor(t, ctx, func() bool { +func waitForL1Node(t *testing.T, ctx context.Context) error { + return waitFor(t, ctx, func() bool { if e := exec.Command( "curl", "-X", @@ -338,19 +342,22 @@ func runNodes(ctx context.Context, t *testing.T) (*NodeBuilder, *TestClient, *Bl } return true }) +} + +// We run one L1 node, two L2 nodes and the espresso containers in this function. +func runNodes(ctx context.Context, t *testing.T) (*NodeBuilder, *TestClient, *BlockchainTestInfo, func()) { + + cleanValNode := createValidationNode(ctx, t, false) + + builder, cleanup := createL1ValidatorPosterNode(ctx, t, hotShotUrl) + + err := waitForL1Node(t, ctx) Require(t, err) cleanEspresso := runEspresso(t, ctx) // wait for the builder - err = waitForWith(t, ctx, 400*time.Second, 1*time.Second, func() bool { - out, err := exec.Command("curl", "http://localhost:41000/availability/block/10", "-L").Output() - if err != nil { - log.Warn("retry to check the builder", "err", err) - return false - } - return len(out) > 0 - }) + err = waitForEspressoNode(t, ctx) Require(t, err) l2Node, l2Info, cleanL2Node := createL2Node(ctx, t, hotShotUrl, builder) diff --git a/system_tests/espresso_sovereign_sequencer_test.go b/system_tests/espresso_sovereign_sequencer_test.go new file mode 100644 index 0000000000..205bc92f6b --- /dev/null +++ b/system_tests/espresso_sovereign_sequencer_test.go @@ -0,0 +1,93 @@ +package arbtest + +import ( + "context" + "fmt" + "math/big" + "testing" + "time" +) + +func createL1AndL2Node(ctx context.Context, t *testing.T) (*TestClient, *BlockchainTestInfo, func()) { + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + builder.l1StackConfig.HTTPPort = 8545 + builder.l1StackConfig.WSPort = 8546 + builder.l1StackConfig.HTTPHost = "0.0.0.0" + builder.l1StackConfig.HTTPVirtualHosts = []string{"*"} + builder.l1StackConfig.WSHost = "0.0.0.0" + builder.l1StackConfig.DataDir = t.TempDir() + builder.l1StackConfig.WSModules = append(builder.l1StackConfig.WSModules, "eth") + + builder.chainConfig.ArbitrumChainParams.EnableEspresso = true + + // poster config + builder.nodeConfig.BatchPoster.Enable = true + builder.nodeConfig.BatchPoster.ErrorDelay = 5 * time.Second + builder.nodeConfig.BatchPoster.MaxSize = 41 + builder.nodeConfig.BatchPoster.PollInterval = 10 * time.Second + builder.nodeConfig.BatchPoster.MaxDelay = -1000 * time.Hour + builder.nodeConfig.BatchPoster.LightClientAddress = lightClientAddress + builder.nodeConfig.BatchPoster.HotShotUrl = hotShotUrl + + // validator config + builder.nodeConfig.BlockValidator.Enable = true + builder.nodeConfig.BlockValidator.ValidationPoll = 2 * time.Second + builder.nodeConfig.BlockValidator.ValidationServer.URL = fmt.Sprintf("ws://127.0.0.1:%d", arbValidationPort) + builder.nodeConfig.BlockValidator.LightClientAddress = lightClientAddress + builder.nodeConfig.BlockValidator.Espresso = true + builder.nodeConfig.DelayedSequencer.Enable = false + + // sequencer config + builder.nodeConfig.Sequencer = true + builder.nodeConfig.Dangerous.NoSequencerCoordinator = true + builder.execConfig.Sequencer.Enable = true + // using the sovereign sequencer + builder.execConfig.Sequencer.Espresso = false + builder.execConfig.Sequencer.EnableEspressoSovereign = true + + // transaction stream config + builder.nodeConfig.TransactionStreamer.SovereignSequencerEnabled = true + builder.nodeConfig.TransactionStreamer.EspressoNamespace = builder.chainConfig.ChainID.Uint64() + builder.nodeConfig.TransactionStreamer.HotShotUrl = hotShotUrl + + cleanup := builder.Build(t) + + mnemonic := "indoor dish desk flag debris potato excuse depart ticket judge file exit" + err := builder.L1Info.GenerateAccountWithMnemonic("CommitmentTask", mnemonic, 5) + Require(t, err) + builder.L1.TransferBalance(t, "Faucet", "CommitmentTask", big.NewInt(9e18), builder.L1Info) + return builder.L2, builder.L2Info, cleanup +} + +func TestSovereignSequencer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + valNodeCleanup := createValidationNode(ctx, t, true) + defer valNodeCleanup() + + l2Node, l2Info, cleanup := createL1AndL2Node(ctx, t) + defer cleanup() + + err := waitForL1Node(t, ctx) + Require(t, err) + + cleanEspresso := runEspresso(t, ctx) + defer cleanEspresso() + + // wait for the builder + err = waitForEspressoNode(t, ctx) + Require(t, err) + + err = checkTransferTxOnL2(t, ctx, l2Node, "User14", l2Info) + Require(t, err) + + msgCnt, err := l2Node.ConsensusNode.TxStreamer.GetMessageCount() + Require(t, err) + + err = waitForWith(t, ctx, 6*time.Minute, 60*time.Second, func() bool { + validatedCnt := l2Node.ConsensusNode.BlockValidator.Validated(t) + return validatedCnt == msgCnt + }) + Require(t, err) +} diff --git a/system_tests/seqfeed_test.go b/system_tests/seqfeed_test.go index ab30598b60..b2c6e94aae 100644 --- a/system_tests/seqfeed_test.go +++ b/system_tests/seqfeed_test.go @@ -315,6 +315,7 @@ func testBlockHashComparison(t *testing.T, blockHash *common.Hash, mustMismatch &l1IncomingMsgHeader, types.Transactions{tx}, []error{nil}, + false, ) Require(t, err)