Skip to content

Commit

Permalink
Submit L2 Message to espresso network (#173)
Browse files Browse the repository at this point in the history
* Submit L2 Message to espresso network

* add context timeout

* check for hotshot liveness and add a polling interval to see if transactions were included in an espresso block

* cleanup

* lint

* Draft the sovereign sequencer test

* Add espresso transaction queue

* Draft the espresso submission

* Set a new l2 message type for the sovereign sequencer transactions

* Attempt to write messages again

* Fix typos and add some comments

* Fix typo

* add header to jst and submit transaction to espresso

* Update go client for development

* Confirm the transaction inclusion in hotshot

* Fix the test

* Unblock the building blocks

* Update go client

* Fix the sovereign test

* add logs

* cleanup

* lint

* Updage go client

* not to fetch merkle proof if hotshot height is 0

* Cleanup

* Fix CI

* Add CI skip tests

* remove unwanted variable

* skipping some tests to make ci pass

* skipping some tests to make ci pass

---------

Co-authored-by: ImJeremyHe <[email protected]>
  • Loading branch information
Sneh1999 and ImJeremyHe authored Aug 5, 2024
1 parent 5ff981a commit c2a4f6b
Show file tree
Hide file tree
Showing 16 changed files with 472 additions and 83 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/espresso-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,8 @@ jobs:
run: |
packages=`go list ./... | grep system_tests`
gotestsum --format short-verbose --packages="$packages" --rerun-fails=1 -- -v -timeout 35m -p 1 ./... -run 'TestEspressoE2E'
- name: Run sovereign sequencer test
run: |
packages=`go list ./... | grep system_tests`
gotestsum --format short-verbose --packages="$packages" --rerun-fails=1 -- -v -timeout 15m -p 1 ./... -run 'TestSovereignSequencer'
19 changes: 16 additions & 3 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,11 @@ func (b *BatchPoster) addEspressoBlockMerkleProof(
return err
}

if jst.Header.Height == 0 {
// This means the header in the jst is still the dummy header.
return fmt.Errorf("this msg has not been included in hotshot %v", jst.Header.Height)
}

snapshot, err := b.lightClientReader.FetchMerkleRoot(jst.Header.Height, nil)
if err != nil {
return fmt.Errorf("could not get the merkle root at height %v", jst.Header.Height)
Expand All @@ -520,10 +525,18 @@ func (b *BatchPoster) addEspressoBlockMerkleProof(
if err != nil {
return fmt.Errorf("error fetching the block merkle proof for validated height %v and leaf height %v. Request failed with error %w", snapshot.Height, jst.Header.Height, err)
}
var newMsg arbostypes.L1IncomingMessage
jst.BlockMerkleJustification = &arbostypes.BlockMerkleJustification{BlockMerkleProof: &proof, BlockMerkleComm: nextHeader.BlockMerkleTreeRoot}
newMsg, err := arbos.MessageFromEspresso(msg.Message.Header, txs, jst)
if err != nil {
return err
if arbos.IsEspressoSovereignMsg(msg.Message) {
newMsg, err = arbos.MessageFromEspressoSovereignTx(txs[0], jst, msg.Message.Header)
if err != nil {
return err
}
} else {
newMsg, err = arbos.MessageFromEspresso(msg.Message.Header, txs, jst)
if err != nil {
return err
}
}
msg.Message = &newMsg
}
Expand Down
8 changes: 1 addition & 7 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func GenerateRollupConfig(prod bool, wasmModuleRoot common.Hash, rollupOwner com

type Config struct {
Sequencer bool `koanf:"sequencer"`
Espresso bool `koanf:"espresso"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"`
DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"`
Expand All @@ -100,9 +99,6 @@ type Config struct {
}

func (c *Config) Validate() error {
if c.Espresso && !c.Sequencer {
return errors.New("cannot enable espresso without enabling sequencer")
}
if c.ParentChainReader.Enable && c.Sequencer && !c.DelayedSequencer.Enable {
log.Warn("delayed sequencer is not enabled, despite sequencer and l1 reader being enabled")
}
Expand Down Expand Up @@ -149,7 +145,6 @@ func (c *Config) ValidatorRequired() bool {

func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feedOutputEnable bool) {
f.Bool(prefix+".sequencer", ConfigDefault.Sequencer, "enable sequencer")
f.Bool(prefix+".espresso", ConfigDefault.Espresso, "enable the espresso sequencer integration")
headerreader.AddOptions(prefix+".parent-chain-reader", f)
InboxReaderConfigAddOptions(prefix+".inbox-reader", f)
DelayedSequencerConfigAddOptions(prefix+".delayed-sequencer", f)
Expand All @@ -168,7 +163,6 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed

var ConfigDefault = Config{
Sequencer: false,
Espresso: false,
ParentChainReader: headerreader.DefaultConfig,
InboxReader: DefaultInboxReaderConfig,
DelayedSequencer: DefaultDelayedSequencerConfig,
Expand Down Expand Up @@ -457,7 +451,7 @@ func createNodeImpl(
if err != nil {
return nil, err
}
} else if config.Sequencer && !config.Dangerous.NoSequencerCoordinator && !config.Espresso {
} else if config.Sequencer && !config.Dangerous.NoSequencerCoordinator {
return nil, errors.New("sequencer must be enabled with coordinator, unless dangerous.no-sequencer-coordinator set")
}
dbs := []ethdb.Database{arbDb}
Expand Down
176 changes: 162 additions & 14 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@ import (
"testing"
"time"

espressoClient "github.com/EspressoSystems/espresso-sequencer-go/client"
espressoTypes "github.com/EspressoSystems/espresso-sequencer-go/types"

"errors"

"github.com/cockroachdb/pebble"
flag "github.com/spf13/pflag"
"github.com/syndtr/goleveldb/leveldb"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
flag "github.com/spf13/pflag"
"github.com/syndtr/goleveldb/leveldb"

"github.com/offchainlabs/nitro/arbos"
"github.com/offchainlabs/nitro/arbos/arbostypes"
Expand Down Expand Up @@ -56,9 +58,11 @@ type TransactionStreamer struct {
config TransactionStreamerConfigFetcher
snapSyncConfig *SnapSyncConfig

insertionMutex sync.Mutex // cannot be acquired while reorgMutex is held
reorgMutex sync.RWMutex
newMessageNotifier chan struct{}
insertionMutex sync.Mutex // cannot be acquired while reorgMutex is held
reorgMutex sync.RWMutex

newMessageNotifier chan struct{}
newSovereignTxNotifier chan struct{}

nextAllowedFeedReorgLog time.Time

Expand All @@ -70,32 +74,54 @@ type TransactionStreamer struct {
broadcastServer *broadcaster.Broadcaster
inboxReader *InboxReader
delayedBridge *DelayedBridge
espressoClient *espressoClient.Client

pendingTxnsQueueMutex sync.Mutex // cannot be acquired while reorgMutex is held
pendingTxnsPos []arbutil.MessageIndex
submittedTxnPos *arbutil.MessageIndex
submittedTxHash *espressoTypes.TaggedBase64
}

type TransactionStreamerConfig struct {
MaxBroadcasterQueueSize int `koanf:"max-broadcaster-queue-size"`
MaxReorgResequenceDepth int64 `koanf:"max-reorg-resequence-depth" reload:"hot"`
ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"`

// Espresso specific fields
SovereignSequencerEnabled bool `koanf:"sovereign-sequencer-enabled"`
HotShotUrl string `koanf:"hotshot-url"`
EspressoNamespace uint64 `koanf:"espresso-namespace"`
EspressoTxnsPollingInterval time.Duration `koanf:"espresso-txns-polling-interval"`
}

type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig

var DefaultTransactionStreamerConfig = TransactionStreamerConfig{
MaxBroadcasterQueueSize: 50_000,
MaxReorgResequenceDepth: 1024,
ExecuteMessageLoopDelay: time.Millisecond * 100,
MaxBroadcasterQueueSize: 50_000,
MaxReorgResequenceDepth: 1024,
ExecuteMessageLoopDelay: time.Millisecond * 100,
SovereignSequencerEnabled: false,
HotShotUrl: "",
EspressoTxnsPollingInterval: time.Millisecond * 100,
}

var TestTransactionStreamerConfig = TransactionStreamerConfig{
MaxBroadcasterQueueSize: 10_000,
MaxReorgResequenceDepth: 128 * 1024,
ExecuteMessageLoopDelay: time.Millisecond,
MaxBroadcasterQueueSize: 10_000,
MaxReorgResequenceDepth: 128 * 1024,
ExecuteMessageLoopDelay: time.Millisecond,
SovereignSequencerEnabled: false,
HotShotUrl: "",
EspressoTxnsPollingInterval: time.Millisecond * 100,
}

func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".max-broadcaster-queue-size", DefaultTransactionStreamerConfig.MaxBroadcasterQueueSize, "maximum cache of pending broadcaster messages")
f.Int64(prefix+".max-reorg-resequence-depth", DefaultTransactionStreamerConfig.MaxReorgResequenceDepth, "maximum number of messages to attempt to resequence on reorg (0 = never resequence, -1 = always resequence)")
f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages")
f.Bool(prefix+".sovereign-sequencer-enabled", DefaultTransactionStreamerConfig.SovereignSequencerEnabled, "if true, transactions will be sent to espresso's sovereign sequencer to be notarized by espresso network")
f.String(prefix+".hotshot-url", DefaultTransactionStreamerConfig.HotShotUrl, "url of the hotshot sequencer")
f.Uint64(prefix+".espresso-namespace", DefaultTransactionStreamerConfig.EspressoNamespace, "espresso namespace that corresponds the L2 chain")
f.Duration(prefix+".espresso-txns-polling-interval", DefaultTransactionStreamerConfig.EspressoTxnsPollingInterval, "interval between polling for transactions to be included in the block")
}

func NewTransactionStreamer(
Expand All @@ -117,6 +143,12 @@ func NewTransactionStreamer(
config: config,
snapSyncConfig: snapSyncConfig,
}

if config().SovereignSequencerEnabled {
espressoClient := espressoClient.NewClient(config().HotShotUrl)
streamer.espressoClient = espressoClient
}

err := streamer.cleanupInconsistentState()
if err != nil {
return nil, err
Expand Down Expand Up @@ -991,8 +1023,9 @@ func (s *TransactionStreamer) WriteMessageFromSequencer(
if err := s.writeMessages(pos, []arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, nil); err != nil {
return err
}
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos)

s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos)
s.SubmitEspressoTransactionPos(pos)
return nil
}

Expand Down Expand Up @@ -1156,7 +1189,6 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context, exec execution
BlockHash: &msgResult.BlockHash,
}
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos)

return pos+1 < msgCount
}

Expand All @@ -1167,7 +1199,123 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc
return s.config().ExecuteMessageLoopDelay
}

func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIndex) {
s.pendingTxnsQueueMutex.Lock()
defer s.pendingTxnsQueueMutex.Unlock()
s.pendingTxnsPos = append(s.pendingTxnsPos, pos)
}

func (s *TransactionStreamer) PollSubmittedTransactionForFinality(ctx context.Context) time.Duration {

data, err := s.espressoClient.FetchTransactionByHash(ctx, s.submittedTxHash)
if err != nil {
log.Error("failed to fetch the transaction hash", "err", err, "pos", s.submittedTxnPos)
return s.config().EspressoTxnsPollingInterval
}
// get the message at the submitted txn position
msg, err := s.getMessageWithMetadataAndBlockHash(*s.submittedTxnPos)
if err != nil {
log.Error("failed to get espresso message", "err", err)
return s.config().EspressoTxnsPollingInterval
}
// parse the message to get the transaction bytes and the justification
txns, jst, err := arbos.ParseEspressoMsg(msg.MessageWithMeta.Message)
if err != nil {
log.Error("failed to parse espresso message", "err", err)
return s.config().EspressoTxnsPollingInterval
}

espressoHeader, err := s.espressoClient.FetchHeaderByHeight(ctx, data.BlockHeight)
if err != nil {
log.Error("espresso: failed to fetch header by height ", "err", err)
return s.config().EspressoTxnsPollingInterval
}

// fetch the namespace proof and vid common. Should use a more efficient way
resp, err := s.espressoClient.FetchTransactionsInBlock(ctx, data.BlockHeight, s.config().EspressoNamespace)
if err != nil {
log.Warn("failed to fetch the transactions in block, will retry", "err", err)
return s.config().EspressoTxnsPollingInterval
}

// Filling in the block justification with the header
jst.Header = espressoHeader
jst.Proof = &resp.Proof
jst.VidCommon = &resp.VidCommon
// create a new message with the header and the txn and the updated block justification
newMsg, err := arbos.MessageFromEspressoSovereignTx(txns[0], jst, msg.MessageWithMeta.Message.Header)
if err != nil {
return s.config().EspressoTxnsPollingInterval
}
msg.MessageWithMeta.Message = &newMsg
batch := s.db.NewBatch()
err = s.writeMessage(*s.submittedTxnPos, *msg, batch)
if err != nil {
return s.config().EspressoTxnsPollingInterval
}
err = batch.Write()
if err != nil {
return s.config().EspressoTxnsPollingInterval
}
s.submittedTxnPos = nil
s.submittedTxHash = nil
return time.Duration(0)
}

func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ignored struct{}) time.Duration {
if s.submittedTxnPos != nil {
if s.PollSubmittedTransactionForFinality(ctx) != time.Duration(0) {
return s.config().EspressoTxnsPollingInterval
}
}

s.pendingTxnsQueueMutex.Lock()
defer s.pendingTxnsQueueMutex.Unlock()
if s.submittedTxnPos == nil && len(s.pendingTxnsPos) > 0 {
// get the message at the pending txn position
msg, err := s.GetMessage(s.pendingTxnsPos[0])
if err != nil {
log.Error("failed to get espresso submitted pos", "err", err)
return s.config().EspressoTxnsPollingInterval
}
bytes, _, err := arbos.ParseEspressoMsg(msg.Message)
if err != nil {
log.Error("failed to parse espresso message before submitting", "err", err)
return s.config().EspressoTxnsPollingInterval
}

espressoTx := espressoTypes.Transaction{
Payload: bytes[0],
Namespace: s.config().EspressoNamespace,
}

log.Info("Submitting transaction to espresso using sovereign sequencer", "tx", espressoTx)

hash, err := s.espressoClient.SubmitTransaction(ctx, espressoTypes.Transaction{
Payload: bytes[0],
Namespace: s.config().EspressoNamespace,
})

if err != nil {
log.Error("failed to submit transaction to espresso", "err", err)
return s.config().EspressoTxnsPollingInterval
}
s.submittedTxnPos = &s.pendingTxnsPos[0]
s.pendingTxnsPos = s.pendingTxnsPos[1:]
s.submittedTxHash = hash
}
return s.config().EspressoTxnsPollingInterval
}

func (s *TransactionStreamer) Start(ctxIn context.Context) error {
s.StopWaiter.Start(ctxIn, s)

if s.config().SovereignSequencerEnabled {
err := stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.submitEspressoTransactions, s.newSovereignTxNotifier)
if err != nil {
return err
}
}

return stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.executeMessages, s.newMessageNotifier)
}
2 changes: 1 addition & 1 deletion arbos/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func ProduceBlock(

var espressoHeader *espressoTypes.Header
if chainConfig.ArbitrumChainParams.EnableEspresso {
if IsEspressoMsg(message) {
if IsEspressoMsg(message) && !IsEspressoSovereignMsg(message) {
// creating a block with espresso message
_, jst, err := ParseEspressoMsg(message)
if err != nil {
Expand Down
Loading

0 comments on commit c2a4f6b

Please sign in to comment.