Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean flags #335

Merged
merged 6 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: true
submodules: recursive

- uses: cargo-bins/cargo-binstall@main
- name: Make more disk space available on public runner
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/espresso-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: true
submodules: recursive

- name: Install dependencies
run: >
Expand Down
9 changes: 0 additions & 9 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,6 @@ RUN ./download-machine.sh consensus-v30 0xb0de9cb89e4d944ae6023a3b62276e54804c24
RUN ./download-machine.sh consensus-v31 0x260f5fa5c3176a856893642e149cf128b5a8de9f828afec8d11184415dd8dc69
RUN ./download-machine.sh consensus-v32 0x184884e1eb9fefdc158f6c8ac912bb183bf3cf83f0090317e0bc4ac5860baa39

#Download Espresso WASM machine
COPY ./scripts/download-machine-espresso.sh .
# To use a new wasm machine
# 1. Create a release on github: for example YYYYMMDD-consensus
# 2. Find the module module-root.txt in the release artifacts on
# https://github.com/EspressoSystems/nitro-espresso-integration/releases
# and add the corresponding download step below.
# 3. Create a new release on github with the change: for example YYYYMMDD

FROM golang:1.21.10-bookworm AS node-builder
WORKDIR /workspace
ARG version=""
Expand Down
27 changes: 20 additions & 7 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,20 @@ type BatchPosterConfig struct {
gasRefunder common.Address
l1BlockBound l1BlockBound
// Espresso specific flags
LightClientAddress string `koanf:"light-client-address"`
HotShotUrl string `koanf:"hotshot-url"`
UserDataAttestationFile string `koanf:"user-data-attestation-file"`
QuoteFile string `koanf:"quote-file"`
UseEscapeHatch bool `koanf:"use-escape-hatch"`
LightClientAddress string `koanf:"light-client-address"`
HotShotUrl string `koanf:"hotshot-url"`
UserDataAttestationFile string `koanf:"user-data-attestation-file"`
QuoteFile string `koanf:"quote-file"`
UseEscapeHatch bool `koanf:"use-escape-hatch"`
EspressoTxnsPollingInterval time.Duration `koanf:"espresso-txns-polling-interval"`
EspressoSwitchDelayThreshold uint64 `koanf:"espresso-switch-delay-threshold"`
}

func (c *BatchPosterConfig) Validate() error {
if (c.LightClientAddress == "") != (c.HotShotUrl == "") {
return errors.New("light client address and hotshot URL must both be set together, or both left unset")

}
if len(c.GasRefunderAddress) > 0 && !common.IsHexAddress(c.GasRefunderAddress) {
return fmt.Errorf("invalid gas refunder address \"%v\"", c.GasRefunderAddress)
}
Expand Down Expand Up @@ -240,6 +246,8 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".user-data-attestation-file", DefaultBatchPosterConfig.UserDataAttestationFile, "specifies the file containing the user data attestation")
f.String(prefix+".quote-file", DefaultBatchPosterConfig.QuoteFile, "specifies the file containing the quote")
f.Bool(prefix+".use-escape-hatch", DefaultBatchPosterConfig.UseEscapeHatch, "if true, batches will be posted without doing the espresso verification when hotshot is down. If false, wait for hotshot being up")
f.Duration(prefix+".espresso-txns-polling-interval", DefaultBatchPosterConfig.EspressoTxnsPollingInterval, "interval between polling for transactions to be included in the block")
f.Uint64(prefix+".espresso-switch-delay-threshold", DefaultBatchPosterConfig.EspressoSwitchDelayThreshold, "specifies the switch delay threshold used to determine hotshot liveness")
redislock.AddConfigOptions(prefix+".redis-lock", f)
dataposter.DataPosterConfigAddOptions(prefix+".data-poster", f, dataposter.DefaultDataPosterConfig)
genericconf.WalletConfigAddOptions(prefix+".parent-chain-wallet", f, DefaultBatchPosterConfig.ParentChainWallet.Pathname)
Expand Down Expand Up @@ -274,6 +282,8 @@ var DefaultBatchPosterConfig = BatchPosterConfig{
UserDataAttestationFile: "",
QuoteFile: "",
UseEscapeHatch: false,
EspressoTxnsPollingInterval: time.Millisecond * 100,
EspressoSwitchDelayThreshold: 20,
}

var DefaultBatchPosterL1WalletConfig = genericconf.WalletConfig{
Expand Down Expand Up @@ -306,6 +316,8 @@ var TestBatchPosterConfig = BatchPosterConfig{
GasEstimateBaseFeeMultipleBips: arbmath.OneInUBips * 3 / 2,
CheckBatchCorrectness: true,
UseEscapeHatch: false,
EspressoTxnsPollingInterval: time.Millisecond * 100,
EspressoSwitchDelayThreshold: 10,
}

type BatchPosterOpts struct {
Expand Down Expand Up @@ -367,10 +379,11 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
return nil, err
}
opts.Streamer.lightClientReader = lightClientReader
opts.Streamer.UseEscapeHatch = opts.Config().UseEscapeHatch
opts.Streamer.espressoTxnsPollingInterval = opts.Config().EspressoTxnsPollingInterval
opts.Streamer.espressoSwitchDelayThreshold = opts.Config().EspressoSwitchDelayThreshold
}

opts.Streamer.UseEscapeHatch = opts.Config().UseEscapeHatch

b := &BatchPoster{
l1Reader: opts.L1Reader,
inbox: opts.Inbox,
Expand Down
83 changes: 32 additions & 51 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@ type TransactionStreamer struct {
broadcastServer *broadcaster.Broadcaster
inboxReader *InboxReader
delayedBridge *DelayedBridge
espressoClient *espressoClient.Client

lightClientReader lightclient.LightClientReaderInterface
// Espresso specific fields. These fields are set from batch poster
espressoClient *espressoClient.Client
lightClientReader lightclient.LightClientReaderInterface
espressoTxnsPollingInterval time.Duration
espressoSwitchDelayThreshold uint64
// Public these fields for testing
HotshotDown bool
UseEscapeHatch bool
Expand All @@ -89,46 +92,26 @@ type TransactionStreamerConfig struct {
MaxBroadcasterQueueSize int `koanf:"max-broadcaster-queue-size"`
MaxReorgResequenceDepth int64 `koanf:"max-reorg-resequence-depth" reload:"hot"`
ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"`

// Espresso specific fields
SovereignSequencerEnabled bool `koanf:"sovereign-sequencer-enabled"`
HotShotUrl string `koanf:"hotshot-url"`
EspressoNamespace uint64 `koanf:"espresso-namespace"`
EspressoTxnsPollingInterval time.Duration `koanf:"espresso-txns-polling-interval"`
EspressoSwitchDelayThreshold uint64 `koanf:"espresso-switch-delay-threshold"`
}

type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig

var DefaultTransactionStreamerConfig = TransactionStreamerConfig{
MaxBroadcasterQueueSize: 50_000,
MaxReorgResequenceDepth: 1024,
ExecuteMessageLoopDelay: time.Millisecond * 100,
SovereignSequencerEnabled: false,
HotShotUrl: "",
EspressoTxnsPollingInterval: time.Millisecond * 100,
EspressoSwitchDelayThreshold: 20,
MaxBroadcasterQueueSize: 50_000,
MaxReorgResequenceDepth: 1024,
ExecuteMessageLoopDelay: time.Millisecond * 100,
}

var TestTransactionStreamerConfig = TransactionStreamerConfig{
MaxBroadcasterQueueSize: 10_000,
MaxReorgResequenceDepth: 128 * 1024,
ExecuteMessageLoopDelay: time.Millisecond,
SovereignSequencerEnabled: false,
HotShotUrl: "",
EspressoTxnsPollingInterval: time.Millisecond * 100,
EspressoSwitchDelayThreshold: 10,
MaxBroadcasterQueueSize: 10_000,
MaxReorgResequenceDepth: 128 * 1024,
ExecuteMessageLoopDelay: time.Millisecond,
}

func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".max-broadcaster-queue-size", DefaultTransactionStreamerConfig.MaxBroadcasterQueueSize, "maximum cache of pending broadcaster messages")
f.Int64(prefix+".max-reorg-resequence-depth", DefaultTransactionStreamerConfig.MaxReorgResequenceDepth, "maximum number of messages to attempt to resequence on reorg (0 = never resequence, -1 = always resequence)")
f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages")
f.Bool(prefix+".sovereign-sequencer-enabled", DefaultTransactionStreamerConfig.SovereignSequencerEnabled, "if true, transactions will be sent to espresso's sovereign sequencer to be notarized by espresso network")
f.String(prefix+".hotshot-url", DefaultTransactionStreamerConfig.HotShotUrl, "url of the hotshot sequencer")
f.Uint64(prefix+".espresso-namespace", DefaultTransactionStreamerConfig.EspressoNamespace, "espresso namespace that corresponds the L2 chain")
f.Duration(prefix+".espresso-txns-polling-interval", DefaultTransactionStreamerConfig.EspressoTxnsPollingInterval, "interval between polling for transactions to be included in the block")
f.Uint64(prefix+".espresso-switch-delay-threshold", DefaultTransactionStreamerConfig.EspressoSwitchDelayThreshold, "specifies the switch delay threshold used to determine hotshot liveness")
}

func NewTransactionStreamer(
Expand All @@ -151,12 +134,6 @@ func NewTransactionStreamer(
snapSyncConfig: snapSyncConfig,
}

if config().SovereignSequencerEnabled {
espressoClient := espressoClient.NewClient(config().HotShotUrl)
streamer.espressoClient = espressoClient

}

err := streamer.cleanupInconsistentState()
if err != nil {
return nil, err
Expand Down Expand Up @@ -693,6 +670,10 @@ func (s *TransactionStreamer) AddFakeInitMessage() error {
}})
}

func (s *TransactionStreamer) isEspressoMode() bool {
return s.lightClientReader != nil && s.espressoClient != nil
}

// Used in redis tests
func (s *TransactionStreamer) GetMessageCountSync(t *testing.T) (arbutil.MessageIndex, error) {
s.insertionMutex.Lock()
Expand Down Expand Up @@ -1297,7 +1278,7 @@ func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Co
}

// Verify the namespace proof
resp, err := s.espressoClient.FetchTransactionsInBlock(ctx, height, s.config().EspressoNamespace)
resp, err := s.espressoClient.FetchTransactionsInBlock(ctx, height, s.chainConfig.ChainID.Uint64())
if err != nil {
log.Warn("failed to fetch the transactions in block, will retry", "err", err)
return false
Expand Down Expand Up @@ -1627,7 +1608,7 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) ti

pendingTxnsPos, err := s.getEspressoPendingTxnsPos()
if err != nil {
return s.config().EspressoTxnsPollingInterval
return s.espressoTxnsPollingInterval
}

if len(pendingTxnsPos) > 0 {
Expand All @@ -1637,7 +1618,7 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) ti
msg, err := s.GetMessage(pos)
if err != nil {
log.Error("failed to get espresso submitted pos", "err", err)
return s.config().EspressoTxnsPollingInterval
return s.espressoTxnsPollingInterval
}
if msg.Message != nil {
msgs = append(msgs, *msg.Message)
Expand All @@ -1646,20 +1627,20 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) ti
payload, msgCnt := arbos.BuildHotShotPayload(&msgs)
if msgCnt == 0 {
log.Error("failed to build the hotshot transaction: a large message has exceeded the size limit")
return s.config().EspressoTxnsPollingInterval
return s.espressoTxnsPollingInterval
}

log.Info("submitting transaction to espresso using sovereign sequencer")

// Note: same key should not be used for two namespaces for this to work
hash, err := s.espressoClient.SubmitTransaction(ctx, espressoTypes.Transaction{
Payload: payload,
Namespace: s.config().EspressoNamespace,
Namespace: s.chainConfig.ChainID.Uint64(),
})

if err != nil {
log.Error("failed to submit transaction to espresso", "err", err)
return s.config().EspressoTxnsPollingInterval
return s.espressoTxnsPollingInterval
}

s.espressoTxnsStateInsertionMutex.Lock()
Expand All @@ -1670,32 +1651,32 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) ti
err = s.setEspressoSubmittedPos(batch, submittedPos)
if err != nil {
log.Error("failed to set the submitted txn pos", "err", err)
return s.config().EspressoTxnsPollingInterval
return s.espressoTxnsPollingInterval
}
pendingTxnsPos = pendingTxnsPos[msgCnt:]
err = s.setEspressoPendingTxnsPos(batch, pendingTxnsPos)
if err != nil {
log.Error("failed to set the pending txns", "err", err)
return s.config().EspressoTxnsPollingInterval
return s.espressoTxnsPollingInterval
}
err = s.setEspressoSubmittedHash(batch, hash)
if err != nil {
log.Error("failed to set the submitted hash", "err", err)
return s.config().EspressoTxnsPollingInterval
return s.espressoTxnsPollingInterval
}

err = batch.Write()
if err != nil {
log.Error("failed to write to db", "err", err)
return s.config().EspressoTxnsPollingInterval
return s.espressoTxnsPollingInterval
}
}

return s.config().EspressoTxnsPollingInterval
return s.espressoTxnsPollingInterval
}

func (s *TransactionStreamer) toggleEscapeHatch(ctx context.Context) error {
live, err := s.lightClientReader.IsHotShotLive(s.config().EspressoSwitchDelayThreshold)
live, err := s.lightClientReader.IsHotShotLive(s.espressoSwitchDelayThreshold)
if err != nil {
return err
}
Expand Down Expand Up @@ -1738,7 +1719,7 @@ func (s *TransactionStreamer) toggleEscapeHatch(ctx context.Context) error {
}

l1Height := header.Header.GetL1Head()
hotshotLive, err := s.lightClientReader.IsHotShotLiveAtHeight(l1Height, s.config().EspressoSwitchDelayThreshold)
hotshotLive, err := s.lightClientReader.IsHotShotLiveAtHeight(l1Height, s.espressoSwitchDelayThreshold)
if err != nil {
return err
}
Expand Down Expand Up @@ -1790,7 +1771,7 @@ func (s *TransactionStreamer) toggleEscapeHatch(ctx context.Context) error {
}

func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct{}) time.Duration {
retryRate := s.config().EspressoTxnsPollingInterval * 50
retryRate := s.espressoTxnsPollingInterval * 50
config, err := s.exec.GetArbOSConfigAtHeight(0) // Pass 0 to get the ArbOS config at current block height.
if err != nil {
log.Error("Error Obtaining ArbOS Config ", "err", err)
Expand All @@ -1802,7 +1783,7 @@ func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct
}
// TODO: `SovereignSequencerEnabled` should be removed as it is only the sovereign sequencer
// will use this function.
if config.ArbitrumChainParams.EnableEspresso && s.config().SovereignSequencerEnabled {
if config.ArbitrumChainParams.EnableEspresso && s.isEspressoMode() {
err := s.toggleEscapeHatch(ctx)
if err != nil {
log.Error("error checking escape hatch", "err", err)
Expand All @@ -1813,14 +1794,14 @@ func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct
return s.submitEspressoTransactions(ctx)
}

return s.config().EspressoTxnsPollingInterval
return s.espressoTxnsPollingInterval
} else {
return retryRate
}
}

func (s *TransactionStreamer) shouldSubmitEspressoTransaction() bool {
if !s.config().SovereignSequencerEnabled {
if !s.isEspressoMode() {
// Not using hotshot as finality layer
return false
}
Expand Down
2 changes: 1 addition & 1 deletion execution/gethexec/espresso_finality_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (n *EspressoFinalityNode) createBlock(ctx context.Context) (returnValue boo
}

hooks := arbos.NoopSequencingHooks()
_, err = n.execEngine.SequenceTransactions(arbHeader, txes, hooks, false)
_, err = n.execEngine.SequenceTransactions(arbHeader, txes, hooks)
if err != nil {
log.Error("espresso finality node: failed to sequence transactions", "err", err)
return false
Expand Down
6 changes: 3 additions & 3 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (s *ExecutionEngine) sequencerWrapper(sequencerFunc func() (*types.Block, e
}
}

func (s *ExecutionEngine) SequenceTransactions(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks, espressoSovereign bool) (*types.Block, error) {
func (s *ExecutionEngine) SequenceTransactions(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
return s.sequencerWrapper(func() (*types.Block, error) {
hooks.TxErrors = nil
return s.sequenceTransactionsWithBlockMutex(header, txes, hooks)
Expand All @@ -510,7 +510,7 @@ func (s *ExecutionEngine) SequenceTransactions(header *arbostypes.L1IncomingMess
// SequenceTransactionsWithProfiling runs SequenceTransactions with tracing and
// CPU profiling enabled. If the block creation takes longer than 2 seconds, it
// keeps both and prints out filenames in an error log line.
func (s *ExecutionEngine) SequenceTransactionsWithProfiling(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks, espressoSovereign bool) (*types.Block, error) {
func (s *ExecutionEngine) SequenceTransactionsWithProfiling(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
pprofBuf, traceBuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
if err := pprof.StartCPUProfile(pprofBuf); err != nil {
log.Error("Starting CPU profiling", "error", err)
Expand All @@ -519,7 +519,7 @@ func (s *ExecutionEngine) SequenceTransactionsWithProfiling(header *arbostypes.L
log.Error("Starting tracing", "error", err)
}
start := time.Now()
res, err := s.SequenceTransactions(header, txes, hooks, espressoSovereign)
res, err := s.SequenceTransactions(header, txes, hooks)
elapsed := time.Since(start)
pprof.StopCPUProfile()
trace.Stop()
Expand Down
Loading
Loading