Skip to content

Commit

Permalink
Add UseEscapeHatch and replace the existing UseEscapeHatch with `…
Browse files Browse the repository at this point in the history
…EnableEscapeHatch` (#396)

* Add UseEscapeHatch and replace the existing UseEscapeHatch with EnableEscapeHatch

* fix

* rename the flag and if escape hatch is not used, not to check the hotshot livenesses

* Rename error

* escape hatch

* Rename and the enabled flag should be set by users

* typo

* Remove TEE address in batch poster

* Fix escape hatch test

* Add hotshot height log

---------

Co-authored-by: ImJeremyHe <[email protected]>
  • Loading branch information
Sneh1999 and ImJeremyHe authored Dec 19, 2024
1 parent 033a7d8 commit f1da446
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 88 deletions.
16 changes: 6 additions & 10 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ type BatchPosterConfig struct {
EspressoTxnsPollingInterval time.Duration `koanf:"espresso-txns-polling-interval"`
EspressoSwitchDelayThreshold uint64 `koanf:"espresso-switch-delay-threshold"`
EspressoMaxTransactionSize uint64 `koanf:"espresso-max-transaction-size"`
EspressoTEEVerifierAddress string `koanf:"espresso-tee-verifier-address"`
}

func (c *BatchPosterConfig) Validate() error {
Expand Down Expand Up @@ -240,10 +239,9 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Uint64(prefix+".gas-estimate-base-fee-multiple-bips", uint64(DefaultBatchPosterConfig.GasEstimateBaseFeeMultipleBips), "for gas estimation, use this multiple of the basefee (measured in basis points) as the max fee per gas")
f.Duration(prefix+".reorg-resistance-margin", DefaultBatchPosterConfig.ReorgResistanceMargin, "do not post batch if its within this duration from layer 1 minimum bounds. Requires l1-block-bound option not be set to \"ignore\"")
f.Bool(prefix+".check-batch-correctness", DefaultBatchPosterConfig.CheckBatchCorrectness, "setting this to true will run the batch against an inbox multiplexer and verifies that it produces the correct set of messages")
f.Bool(prefix+".use-escape-hatch", DefaultBatchPosterConfig.UseEscapeHatch, "if true, batches will be posted without doing the espresso verification when hotshot is down. If false, wait for hotshot being up")
f.Bool(prefix+".use-escape-hatch", DefaultBatchPosterConfig.UseEscapeHatch, "if true, Escape Hatch functionality will be used")
f.Duration(prefix+".espresso-txns-polling-interval", DefaultBatchPosterConfig.EspressoTxnsPollingInterval, "interval between polling for transactions to be included in the block")
f.Uint64(prefix+".espresso-switch-delay-threshold", DefaultBatchPosterConfig.EspressoSwitchDelayThreshold, "specifies the switch delay threshold used to determine hotshot liveness")
f.String(prefix+".espresso-tee-verifier-address", DefaultBatchPosterConfig.EspressoTEEVerifierAddress, "")
redislock.AddConfigOptions(prefix+".redis-lock", f)
dataposter.DataPosterConfigAddOptions(prefix+".data-poster", f, dataposter.DefaultDataPosterConfig)
genericconf.WalletConfigAddOptions(prefix+".parent-chain-wallet", f, DefaultBatchPosterConfig.ParentChainWallet.Pathname)
Expand Down Expand Up @@ -282,7 +280,6 @@ var DefaultBatchPosterConfig = BatchPosterConfig{
LightClientAddress: "",
HotShotUrl: "",
EspressoMaxTransactionSize: 900 * 1024,
EspressoTEEVerifierAddress: "",
}

var DefaultBatchPosterL1WalletConfig = genericconf.WalletConfig{
Expand Down Expand Up @@ -385,7 +382,6 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
opts.Streamer.espressoTxnsPollingInterval = opts.Config().EspressoTxnsPollingInterval
opts.Streamer.espressoSwitchDelayThreshold = opts.Config().EspressoSwitchDelayThreshold
opts.Streamer.espressoMaxTransactionSize = opts.Config().EspressoMaxTransactionSize
opts.Streamer.espressoTEEVerifierAddress = common.HexToAddress(opts.Config().EspressoTEEVerifierAddress)
}

b := &BatchPoster{
Expand Down Expand Up @@ -556,7 +552,7 @@ func AccessList(opts *AccessListOpts) types.AccessList {
return l
}

var EspressoFetchMerkleRootErr = errors.New("failed to fetch the espresso merkle roof")
var EspressoValidationErr = errors.New("failed to check espresso validation")
var EspressoFetchTransactionErr = errors.New("failed to fetch the espresso transaction")

// Adds a block merkle proof to an Espresso justification, providing a proof that a set of transactions
Expand Down Expand Up @@ -594,12 +590,12 @@ func (b *BatchPoster) checkEspressoValidation() error {
}
}

if b.streamer.HotshotDown && b.streamer.UseEscapeHatch {
if b.streamer.EscapeHatchEnabled {
log.Warn("skipped espresso verification due to hotshot failure", "pos", b.building.msgCount)
return nil
}

return fmt.Errorf("%w (height: %d)", EspressoFetchMerkleRootErr, b.building.msgCount)
return fmt.Errorf("%w (height: %d)", EspressoValidationErr, b.building.msgCount)
}

func (b *BatchPoster) submitEspressoTransactionPos(pos arbutil.MessageIndex) error {
Expand Down Expand Up @@ -1418,7 +1414,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)

// Submit message positions to pending queue
shouldSubmit := b.streamer.shouldSubmitEspressoTransaction()
if !b.streamer.UseEscapeHatch || shouldSubmit {
if shouldSubmit {
for p := b.building.msgCount; p < msgCount; p += 1 {
err = b.submitEspressoTransactionPos(p)
if err != nil {
Expand Down Expand Up @@ -1754,7 +1750,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
storageRaceEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, storage.ErrStorageRace.Error(), time.Minute)
normalGasEstimationFailedEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, ErrNormalGasEstimationFailed.Error(), time.Minute)
accumulatorNotFoundEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, AccumulatorNotFoundErr.Error(), time.Minute)
espressoEphemeralErrorHandler := util.NewEphemeralErrorHandler(80*time.Minute, EspressoFetchMerkleRootErr.Error(), time.Hour)
espressoEphemeralErrorHandler := util.NewEphemeralErrorHandler(80*time.Minute, EspressoValidationErr.Error(), time.Hour)
resetAllEphemeralErrs := func() {
commonEphemeralErrorHandler.Reset()
exceedMaxMempoolSizeEphemeralErrorHandler.Reset()
Expand Down
110 changes: 39 additions & 71 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ type TransactionStreamer struct {
espressoSwitchDelayThreshold uint64
espressoMaxTransactionSize uint64
// Public these fields for testing
HotshotDown bool
UseEscapeHatch bool
espressoTEEVerifierAddress common.Address
EscapeHatchEnabled bool
UseEscapeHatch bool
}

type TransactionStreamerConfig struct {
Expand Down Expand Up @@ -142,6 +141,7 @@ func NewTransactionStreamer(
fatalErrChan: fatalErrChan,
config: config,
snapSyncConfig: snapSyncConfig,
EscapeHatchEnabled: false,
}

err := streamer.cleanupInconsistentState()
Expand Down Expand Up @@ -1271,18 +1271,18 @@ func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Co
if err != nil {
return fmt.Errorf("failed to fetch the submitted transaction hash (hash: %s): %w", submittedTxHash.String(), err)
}

height := data.BlockHeight

header, err := s.espressoClient.FetchHeaderByHeight(ctx, height)
if err != nil {
return fmt.Errorf("could not get the header (height: %d): %w", height, err)
}

log.Info("Fetching Merkle Root at hotshot height: ", height)
// Verify the merkle proof
snapshot, err := s.lightClientReader.FetchMerkleRoot(height, nil)
if err != nil {
return fmt.Errorf("%w (height: %d): %w", EspressoFetchMerkleRootErr, height, err)
return fmt.Errorf("%w (height: %d): %w", EspressoValidationErr, height, err)
}

if snapshot.Height <= height {
Expand Down Expand Up @@ -1345,7 +1345,7 @@ func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Co

batch := s.db.NewBatch()
if err := s.cleanEspressoSubmittedData(batch); err != nil {
return nil
return err
}
lastConfirmedPos := submittedTxnPos[len(submittedTxnPos)-1]
if err := s.setEspressoLastConfirmedPos(batch, &lastConfirmedPos); err != nil {
Expand Down Expand Up @@ -1707,67 +1707,37 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) ti
return s.espressoTxnsPollingInterval
}

func (s *TransactionStreamer) checkEspressoLiveness(ctx context.Context) error {
// Make sure useEscapeHatch is true
func (s *TransactionStreamer) checkEspressoLiveness() error {
live, err := s.lightClientReader.IsHotShotLive(s.espressoSwitchDelayThreshold)
if err != nil {
return err
}
// If hotshot is down, escape hatch is activated, the only thing is to check if hotshot is live again
if s.HotshotDown {
// If escape hatch is activated, the only thing is to check if hotshot is live again
if s.EscapeHatchEnabled {
if live {
log.Info("HotShot is up, disabling the escape hatch")
s.HotshotDown = false
s.EscapeHatchEnabled = false
}
return nil
}

// If hotshot was previously up, now it is down
if !live {
log.Warn("enabling the escape hatch, hotshot is down")
s.HotshotDown = true
}

if !s.UseEscapeHatch {
// If escape hatch is disabled, hotshot is live, everything is fine
if live {
return nil
}

submittedHash, err := s.getEspressoSubmittedHash()
if err != nil {
return err
}

// No transaction is waiting for espresso finalization
if submittedHash == nil {
return nil
}
// If escape hatch is on, and hotshot is down
log.Warn("enabling the escape hatch, hotshot is down")
s.EscapeHatchEnabled = true

// If a submitted transaction is waiting for being finalized, check if hotshot is live at
// the corresponding L1 height.
data, err := s.espressoClient.FetchTransactionByHash(ctx, submittedHash)
if err != nil {
return err
}

header, err := s.espressoClient.FetchHeaderByHeight(ctx, data.BlockHeight)
if err != nil {
return err
}

l1Height := header.Header.GetL1Head()
hotshotLive, err := s.lightClientReader.IsHotShotLiveAtHeight(l1Height, s.espressoSwitchDelayThreshold)
if err != nil {
return err
}
if hotshotLive {
// This transaction will be still finalized
return nil
}
// Skip the espresso verification for the submitted messages
submitted, err := s.getEspressoSubmittedPos()
if err != nil {
return err
}
if len(submitted) == 0 {
return fmt.Errorf("submitted messages should not have the length of 0")
return nil
}

last := submitted[len(submitted)-1]
Expand Down Expand Up @@ -1796,7 +1766,7 @@ func (s *TransactionStreamer) checkEspressoLiveness(ctx context.Context) error {
return nil
}

var espressoMerkleProofEphemeralErrorHandler = util.NewEphemeralErrorHandler(80*time.Minute, EspressoFetchMerkleRootErr.Error(), time.Hour)
var espressoMerkleProofEphemeralErrorHandler = util.NewEphemeralErrorHandler(80*time.Minute, EspressoValidationErr.Error(), time.Hour)
var espressoTransactionEphemeralErrorHandler = util.NewEphemeralErrorHandler(3*time.Minute, EspressoFetchTransactionErr.Error(), time.Minute)

func getLogLevel(err error) func(string, ...interface{}) {
Expand All @@ -1808,9 +1778,9 @@ func getLogLevel(err error) func(string, ...interface{}) {

func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct{}) time.Duration {
retryRate := s.espressoTxnsPollingInterval * 50
enabledEspresso := s.espressoTEEVerifierAddress != common.Address{}
if enabledEspresso {
err := s.checkEspressoLiveness(ctx)
var err error
if s.UseEscapeHatch {
err = s.checkEspressoLiveness()
if err != nil {
if ctx.Err() != nil {
return 0
Expand All @@ -1819,31 +1789,29 @@ func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct
logLevel("error checking escape hatch, will retry", "err", err)
return retryRate
}
err = s.pollSubmittedTransactionForFinality(ctx)
if err != nil {
if ctx.Err() != nil {
return 0
}
logLevel := getLogLevel(err)
logLevel("error polling finality, will retry", "err", err)
return retryRate
} else {
espressoMerkleProofEphemeralErrorHandler.Reset()
}

shouldSubmit := s.shouldSubmitEspressoTransaction()
if shouldSubmit {
return s.submitEspressoTransactions(ctx)
espressoTransactionEphemeralErrorHandler.Reset()
}
err = s.pollSubmittedTransactionForFinality(ctx)
if err != nil {
if ctx.Err() != nil {
return 0
}

return s.espressoTxnsPollingInterval
} else {
logLevel := getLogLevel(err)
logLevel("error polling finality, will retry", "err", err)
return retryRate
}
espressoMerkleProofEphemeralErrorHandler.Reset()

shouldSubmit := s.shouldSubmitEspressoTransaction()
if shouldSubmit {
return s.submitEspressoTransactions(ctx)
}

return s.espressoTxnsPollingInterval
}

func (s *TransactionStreamer) shouldSubmitEspressoTransaction() bool {
return !s.HotshotDown
return !s.EscapeHatchEnabled
}

func (s *TransactionStreamer) Start(ctxIn context.Context) error {
Expand Down
3 changes: 0 additions & 3 deletions system_tests/espresso_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ var workingDir = "./espresso-e2e"
// light client proxy
var lightClientAddress = "0x60571c8f4b52954a24a5e7306d435e951528d963"

// TODO: espresso TEE verifier address
var verifierAddress = "0x60571c8f4b52954a24a5e7306d435e951528d963"

var hotShotUrl = "http://127.0.0.1:41000"

var (
Expand Down
10 changes: 7 additions & 3 deletions system_tests/espresso_escape_hatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestEspressoEscapeHatch(t *testing.T) {
if builder.L2.ConsensusNode.TxStreamer.UseEscapeHatch {
t.Fatal("testing not using escape hatch first")
}
log.Info("Checking turning off the escape hatch")
log.Info("Checking turning on the escape hatch")

// Start to check the escape hatch

Expand All @@ -68,7 +68,11 @@ func TestEspressoEscapeHatch(t *testing.T) {
log.Info("waiting for light client to report hotshot is down")
err = waitForWith(ctx, 10*time.Minute, 10*time.Second, func() bool {
log.Info("waiting for hotshot down")
return builder.L2.ConsensusNode.TxStreamer.HotshotDown
live, err := lightclientmock.IsHotShotLive(t, builder.L1.Client, address, 1)
if err != nil {
log.Error("error checking hotshot live", "err", err)
}
return !live
})
Require(t, err)

Expand Down Expand Up @@ -122,7 +126,7 @@ func TestEspressoEscapeHatch(t *testing.T) {

err = waitForWith(ctx, 10*time.Minute, 10*time.Second, func() bool {
log.Info("waiting for hotshot down")
return builder.L2.ConsensusNode.TxStreamer.HotshotDown
return builder.L2.ConsensusNode.TxStreamer.EscapeHatchEnabled
})
Require(t, err)

Expand Down
2 changes: 1 addition & 1 deletion system_tests/espresso_sovereign_sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func createL1AndL2Node(
builder.nodeConfig.BatchPoster.MaxDelay = -1000 * time.Hour
builder.nodeConfig.BatchPoster.LightClientAddress = lightClientAddress
builder.nodeConfig.BatchPoster.HotShotUrl = hotShotUrl
builder.nodeConfig.BatchPoster.EspressoTEEVerifierAddress = verifierAddress
builder.nodeConfig.BatchPoster.UseEscapeHatch = false

// validator config
builder.nodeConfig.BlockValidator.Enable = true
Expand Down

0 comments on commit f1da446

Please sign in to comment.