diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 1b4305b850..7394efbe20 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -55,7 +55,6 @@ import ( "github.com/offchainlabs/nitro/util" "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/util/blobs" - "github.com/offchainlabs/nitro/util/dbutil" "github.com/offchainlabs/nitro/util/headerreader" "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/stopwaiter" @@ -185,6 +184,7 @@ type BatchPosterConfig struct { HotShotUrl string `koanf:"hotshot-url"` UserDataAttestationFile string `koanf:"user-data-attestation-file"` QuoteFile string `koanf:"quote-file"` + UseEscapeHatch bool `koanf:"use-escape-hatch"` } func (c *BatchPosterConfig) Validate() error { @@ -239,6 +239,7 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Bool(prefix+".check-batch-correctness", DefaultBatchPosterConfig.CheckBatchCorrectness, "setting this to true will run the batch against an inbox multiplexer and verifies that it produces the correct set of messages") f.String(prefix+".user-data-attestation-file", DefaultBatchPosterConfig.UserDataAttestationFile, "specifies the file containing the user data attestation") f.String(prefix+".quote-file", DefaultBatchPosterConfig.QuoteFile, "specifies the file containing the quote") + f.Bool(prefix+".use-escape-hatch", DefaultBatchPosterConfig.UseEscapeHatch, "if true, batches will be posted without doing the espresso verification when hotshot is down. If false, wait for hotshot being up") redislock.AddConfigOptions(prefix+".redis-lock", f) dataposter.DataPosterConfigAddOptions(prefix+".data-poster", f, dataposter.DefaultDataPosterConfig) genericconf.WalletConfigAddOptions(prefix+".parent-chain-wallet", f, DefaultBatchPosterConfig.ParentChainWallet.Pathname) @@ -272,6 +273,7 @@ var DefaultBatchPosterConfig = BatchPosterConfig{ CheckBatchCorrectness: true, UserDataAttestationFile: "", QuoteFile: "", + UseEscapeHatch: false, } var DefaultBatchPosterL1WalletConfig = genericconf.WalletConfig{ @@ -303,6 +305,7 @@ var TestBatchPosterConfig = BatchPosterConfig{ UseAccessLists: true, GasEstimateBaseFeeMultipleBips: arbmath.OneInUBips * 3 / 2, CheckBatchCorrectness: true, + UseEscapeHatch: false, } type BatchPosterOpts struct { @@ -366,6 +369,8 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e opts.Streamer.lightClientReader = lightClientReader } + opts.Streamer.UseEscapeHatch = opts.Config().UseEscapeHatch + b := &BatchPoster{ l1Reader: opts.L1Reader, inbox: opts.Inbox, @@ -552,43 +557,63 @@ func (b *BatchPoster) checkEspressoValidation( return nil } - arbOSConfig, err := b.arbOSVersionGetter.GetArbOSConfigAtHeight(0) + lastConfirmed, err := b.streamer.getLastConfirmedPos() if err != nil { - return fmt.Errorf("Failed call to GetArbOSConfigAtHeight: %w", err) - } - if arbOSConfig == nil { - return fmt.Errorf("Cannot use a nil ArbOSConfig") + log.Error("failed call to get last confirmed pos", "err", err) + return err } - if !arbOSConfig.ArbitrumChainParams.EnableEspresso { + + // This message has passed the espresso verification + if lastConfirmed != nil && b.building.msgCount <= *lastConfirmed { return nil } - hasNotSubmitted, err := b.streamer.HasNotSubmitted(b.building.msgCount) - if err != nil { - return err - } - if hasNotSubmitted { - // Store the pos in the database to be used later to submit the message - // to hotshot for finalization. - log.Info("submitting pos", "pos", b.building.msgCount) - err = b.streamer.SubmitEspressoTransactionPos(b.building.msgCount, b.streamer.db.NewBatch()) + log.Warn("this message has not been finalized on L1 or validated") + + if b.streamer.UseEscapeHatch { + skip, err := b.streamer.getSkipVerificationPos() if err != nil { - log.Error("failed to submit espresso transaction pos", "pos", b.building.msgCount, "err", err) + log.Error("failed call to get skip verification pos", "err", err) return err } - return fmt.Errorf("this msg has not been included in hotshot") + + // Skip checking espresso validation due to hotshot failure + if skip != nil { + if b.building.msgCount <= *skip { + log.Warn("skipped espresso verification due to hotshot failure", "pos", b.building.msgCount) + return nil + } + // TODO: if current position is greater than the `skip`, should set the + // the skip value to nil. This should contribute to better efficiency. + } } - lastConfirmed, err := b.streamer.getLastConfirmedPos() - if dbutil.IsErrNotFound(err) { - return fmt.Errorf("no confirmed message has been found") + if b.streamer.HotshotDown && b.streamer.UseEscapeHatch { + log.Warn("skipped espresso verification due to hotshot failure", "pos", b.building.msgCount) + return nil } + + return fmt.Errorf("waiting for espresso finalization, pos: %d", b.building.msgCount) +} + +func (b *BatchPoster) submitEspressoTransactionPos(pos arbutil.MessageIndex) error { + hasNotSubmitted, err := b.streamer.HasNotSubmitted(pos) if err != nil { return err } - if lastConfirmed < b.building.msgCount { - return fmt.Errorf("this msg has not been finalized on L1 or validated") + if !hasNotSubmitted { + return nil + } + + // Store the pos in the database to be used later to submit the message + // to hotshot for finalization. + log.Info("submitting pos", "pos", pos) + err = b.streamer.SubmitEspressoTransactionPos(pos, b.streamer.db.NewBatch()) + if err != nil { + log.Error("failed to submit espresso transaction pos", "pos", pos, "err", err) + return err } + return nil } @@ -1417,6 +1442,30 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) } } + // Submit message positions to pending queue + if !b.streamer.UseEscapeHatch || b.streamer.shouldSubmitEspressoTransaction() { + for p := b.building.msgCount; p < msgCount; p += 1 { + msg, err := b.streamer.GetMessage(p) + if err != nil { + log.Error("error getting message from streamer", "error", err) + break + } + // We only submit the user transactions to hotshot. + if msg.Message.Header.Kind != arbostypes.L1MessageType_L2Message { + continue + } + kind := msg.Message.L2msg[0] + if kind != arbos.L2MessageKind_Batch && kind != arbos.L2MessageKind_SignedTx { + continue + } + err = b.submitEspressoTransactionPos(p) + if err != nil { + log.Error("error submitting position", "error", err, "pos", p) + break + } + } + } + for b.building.msgCount < msgCount { msg, err := b.streamer.GetMessage(b.building.msgCount) if err != nil { diff --git a/arbnode/schema.go b/arbnode/schema.go index 0a1ca02372..3ce1a3e871 100644 --- a/arbnode/schema.go +++ b/arbnode/schema.go @@ -21,6 +21,7 @@ var ( espressoSubmittedHash []byte = []byte("_espressoSubmittedHash") // contains the hash of the last submitted txn espressoPendingTxnsPositions []byte = []byte("_espressoPendingTxnsPositions") // contains the index of the pending txns that need to be submitted to espresso espressoLastConfirmedPos []byte = []byte("_espressoLastConfirmedPos") // contains the position of the last confirmed message + espressoSkipVerificationPos []byte = []byte("_espressoSkipVerificationPos") // contains the position of the latest message that should skip the validation due to hotshot liveness failure ) const currentDbSchemaVersion uint64 = 1 diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 82ba8fb864..bd6de7e5ea 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -80,6 +80,9 @@ type TransactionStreamer struct { espressoClient *espressoClient.Client lightClientReader lightclient.LightClientReaderInterface + // Public these fields for testing + HotshotDown bool + UseEscapeHatch bool } type TransactionStreamerConfig struct { @@ -88,30 +91,33 @@ type TransactionStreamerConfig struct { ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"` // Espresso specific fields - SovereignSequencerEnabled bool `koanf:"sovereign-sequencer-enabled"` - HotShotUrl string `koanf:"hotshot-url"` - EspressoNamespace uint64 `koanf:"espresso-namespace"` - EspressoTxnsPollingInterval time.Duration `koanf:"espresso-txns-polling-interval"` + SovereignSequencerEnabled bool `koanf:"sovereign-sequencer-enabled"` + HotShotUrl string `koanf:"hotshot-url"` + EspressoNamespace uint64 `koanf:"espresso-namespace"` + EspressoTxnsPollingInterval time.Duration `koanf:"espresso-txns-polling-interval"` + EspressoSwitchDelayThreshold uint64 `koanf:"espresso-switch-delay-threshold"` } type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig var DefaultTransactionStreamerConfig = TransactionStreamerConfig{ - MaxBroadcasterQueueSize: 50_000, - MaxReorgResequenceDepth: 1024, - ExecuteMessageLoopDelay: time.Millisecond * 100, - SovereignSequencerEnabled: false, - HotShotUrl: "", - EspressoTxnsPollingInterval: time.Millisecond * 100, + MaxBroadcasterQueueSize: 50_000, + MaxReorgResequenceDepth: 1024, + ExecuteMessageLoopDelay: time.Millisecond * 100, + SovereignSequencerEnabled: false, + HotShotUrl: "", + EspressoTxnsPollingInterval: time.Millisecond * 100, + EspressoSwitchDelayThreshold: 20, } var TestTransactionStreamerConfig = TransactionStreamerConfig{ - MaxBroadcasterQueueSize: 10_000, - MaxReorgResequenceDepth: 128 * 1024, - ExecuteMessageLoopDelay: time.Millisecond, - SovereignSequencerEnabled: false, - HotShotUrl: "", - EspressoTxnsPollingInterval: time.Millisecond * 100, + MaxBroadcasterQueueSize: 10_000, + MaxReorgResequenceDepth: 128 * 1024, + ExecuteMessageLoopDelay: time.Millisecond, + SovereignSequencerEnabled: false, + HotShotUrl: "", + EspressoTxnsPollingInterval: time.Millisecond * 100, + EspressoSwitchDelayThreshold: 10, } func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -122,6 +128,7 @@ func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".hotshot-url", DefaultTransactionStreamerConfig.HotShotUrl, "url of the hotshot sequencer") f.Uint64(prefix+".espresso-namespace", DefaultTransactionStreamerConfig.EspressoNamespace, "espresso namespace that corresponds the L2 chain") f.Duration(prefix+".espresso-txns-polling-interval", DefaultTransactionStreamerConfig.EspressoTxnsPollingInterval, "interval between polling for transactions to be included in the block") + f.Uint64(prefix+".espresso-switch-delay-threshold", DefaultTransactionStreamerConfig.EspressoSwitchDelayThreshold, "specifies the switch delay threshold used to determine hotshot liveness") } func NewTransactionStreamer( @@ -1252,25 +1259,33 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc return s.config().ExecuteMessageLoopDelay } -func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Context) time.Duration { +// Check if the latest submitted transaction has been finalized on L1 and verify it. +// Return a bool indicating whether a new transaction can be submitted to HotShot +func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Context) bool { submittedTxnPos, err := s.getEspressoSubmittedPos() if err != nil { log.Warn("submitted pos not found", "err", err) - return s.config().EspressoTxnsPollingInterval + return false } if len(submittedTxnPos) == 0 { - return s.config().EspressoTxnsPollingInterval + return true // no submitted transaction } submittedTxHash, err := s.getEspressoSubmittedHash() if err != nil { log.Warn("submitted hash not found", "err", err) - return s.config().EspressoTxnsPollingInterval + return false } - data, err := s.espressoClient.FetchTransactionByHash(ctx, &submittedTxHash) + if submittedTxHash == nil { + // this should not happen + log.Warn("missing the tx hash while the submitted txn position exists") + return false + } + + data, err := s.espressoClient.FetchTransactionByHash(ctx, submittedTxHash) if err != nil { log.Warn("failed to fetch the submitted transaction hash", "err", err, "hash", submittedTxHash.String()) - return s.config().EspressoTxnsPollingInterval + return false } height := data.BlockHeight @@ -1278,14 +1293,14 @@ func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Co header, err := s.espressoClient.FetchHeaderByHeight(ctx, height) if err != nil { log.Warn("could not get the header", "height", height, "err", err) - return s.config().EspressoTxnsPollingInterval + return false } // Verify the namespace proof resp, err := s.espressoClient.FetchTransactionsInBlock(ctx, height, s.config().EspressoNamespace) if err != nil { log.Warn("failed to fetch the transactions in block, will retry", "err", err) - return s.config().EspressoTxnsPollingInterval + return false } msgs := []arbostypes.L1IncomingMessage{} @@ -1293,7 +1308,7 @@ func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Co msg, err := s.GetMessage(p) if err != nil { log.Error("failed to get the message in tx streamer", "pos", p) - return s.config().EspressoTxnsPollingInterval + return false } if msg.Message != nil { msgs = append(msgs, *msg.Message) @@ -1304,50 +1319,49 @@ func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Co payload, length := arbos.BuildHotShotPayload(&msgs) if length != len(msgs) { log.Error("failed to rebuild the hotshot payload, it is expected rebuild the transaction within all messages") - return s.config().EspressoTxnsPollingInterval + return false } namespaceOk := espressocrypto.VerifyNamespace(s.chainConfig.ChainID.Uint64(), resp.Proof, *header.Header.GetPayloadCommitment(), *header.Header.GetNsTable(), []espressoTypes.Bytes{payload}, resp.VidCommon) if !namespaceOk { log.Error("error validating namespace proof", "height", height) - return s.config().EspressoTxnsPollingInterval + return false } // Verify the merkle tree proof snapshot, err := s.lightClientReader.FetchMerkleRoot(height, nil) if err != nil { log.Warn("could not get the merkle root", "height", height, "err", err) - return s.config().EspressoTxnsPollingInterval + return false } if snapshot.Height <= height { - log.Error("got a wrong snapshot whose root height is not greater than the leaf", "height", height, "root height", snapshot.Height) - return s.config().EspressoTxnsPollingInterval + return false } nextHeader, err := s.espressoClient.FetchHeaderByHeight(ctx, snapshot.Height) if err != nil { log.Warn("error fetching the snapshot header", "height", snapshot.Height, "err", err) - return s.config().EspressoTxnsPollingInterval + return false } proof, err := s.espressoClient.FetchBlockMerkleProof(ctx, snapshot.Height, height) if err != nil { - log.Warn("error fetching the block merkle proof", "height", height, "root height", snapshot.Height) - return s.config().EspressoTxnsPollingInterval + log.Warn("error fetching the block merkle proof", "height", height, "root height", snapshot.Height, "err", err) + return false } blockMerkleTreeRoot := nextHeader.Header.GetBlockMerkleTreeRoot() jstHeader, err := json.Marshal(header) if err != nil { log.Error("Failed to Marshal the header") - return s.config().EspressoTxnsPollingInterval + return false } ok := espressocrypto.VerifyMerkleProof(proof.Proof, jstHeader, *blockMerkleTreeRoot, snapshot.Root) if !ok { log.Error("error validating merkle proof", "height", height, "snapshot height", snapshot.Height) - return s.config().EspressoTxnsPollingInterval + return false } // Validation completed. Update the database @@ -1358,32 +1372,35 @@ func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Co err = s.setEspressoSubmittedPos(batch, nil) if err != nil { log.Warn("failed to set the submitted pos to nil", "err", err) - return s.config().EspressoTxnsPollingInterval + return false } - err = s.setEspressoSubmittedHash(batch, tagged_base64.TaggedBase64{}) + err = s.setEspressoSubmittedHash(batch, nil) if err != nil { log.Warn("failed to set the submitted hash to nil", "err", err) - return s.config().EspressoTxnsPollingInterval + return false } lastConfirmedPos := submittedTxnPos[len(submittedTxnPos)-1] - err = s.setEspressoLastConfirmedPos(batch, lastConfirmedPos) + err = s.setEspressoLastConfirmedPos(batch, &lastConfirmedPos) if err != nil { log.Warn("failed to set the last confirmed position", "err", err, "pos", lastConfirmedPos) - return s.config().EspressoTxnsPollingInterval + return false } err = batch.Write() if err != nil { log.Error("failed to write to db", "err", err) - return s.config().EspressoTxnsPollingInterval + return false } log.Info("Finality message", "pos", submittedTxnPos, "tx", submittedTxHash.String()) - return time.Duration(0) + return true } func (s *TransactionStreamer) getEspressoSubmittedPos() ([]arbutil.MessageIndex, error) { posBytes, err := s.db.Get(espressoSubmittedPos) if err != nil { + if dbutil.IsErrNotFound(err) { + return nil, nil + } return nil, err } @@ -1397,43 +1414,68 @@ func (s *TransactionStreamer) getEspressoSubmittedPos() ([]arbutil.MessageIndex, return pos, nil } -func (s *TransactionStreamer) getEspressoSubmittedHash() (espressoTypes.TaggedBase64, error) { +func (s *TransactionStreamer) getEspressoSubmittedHash() (*espressoTypes.TaggedBase64, error) { posBytes, err := s.db.Get(espressoSubmittedHash) if err != nil { - return espressoTypes.TaggedBase64{}, err + if dbutil.IsErrNotFound(err) { + return nil, nil + } + return nil, err } var hash string err = rlp.DecodeBytes(posBytes, &hash) if err != nil { - return espressoTypes.TaggedBase64{}, err + return nil, err } hashParsed, err := tagged_base64.Parse(hash) if hashParsed == nil { - return espressoTypes.TaggedBase64{}, err + return nil, err } - return espressoTypes.TaggedBase64(*hashParsed), nil + return hashParsed, nil } -func (s *TransactionStreamer) getLastConfirmedPos() (arbutil.MessageIndex, error) { +func (s *TransactionStreamer) getLastConfirmedPos() (*arbutil.MessageIndex, error) { lastConfirmedBytes, err := s.db.Get(espressoLastConfirmedPos) if err != nil { - return 0, err + if dbutil.IsErrNotFound(err) { + return nil, nil + } + return nil, err } var lastConfirmed arbutil.MessageIndex err = rlp.DecodeBytes(lastConfirmedBytes, &lastConfirmed) if err != nil { - return 0, err + return nil, err } - return lastConfirmed, nil + return &lastConfirmed, nil } -func (s *TransactionStreamer) getEspressoPendingTxnsPos() ([]*arbutil.MessageIndex, error) { +func (s *TransactionStreamer) getSkipVerificationPos() (*arbutil.MessageIndex, error) { + lastConfirmedBytes, err := s.db.Get(espressoSkipVerificationPos) + if err != nil { + if dbutil.IsErrNotFound(err) { + return nil, nil + } + return nil, err + } + var skipPos arbutil.MessageIndex + err = rlp.DecodeBytes(lastConfirmedBytes, &skipPos) + if err != nil { + return nil, err + } + return &skipPos, nil +} + +func (s *TransactionStreamer) getEspressoPendingTxnsPos() ([]arbutil.MessageIndex, error) { pendingTxnsBytes, err := s.db.Get(espressoPendingTxnsPositions) if err != nil { + if dbutil.IsErrNotFound(err) { + return nil, nil + } return nil, err } - var pendingTxnsPos []*arbutil.MessageIndex + var pendingTxnsPos []arbutil.MessageIndex err = rlp.DecodeBytes(pendingTxnsBytes, &pendingTxnsPos) if err != nil { return nil, err @@ -1441,7 +1483,7 @@ func (s *TransactionStreamer) getEspressoPendingTxnsPos() ([]*arbutil.MessageInd return pendingTxnsPos, nil } -func (s *TransactionStreamer) setEspressoSubmittedPos(batch ethdb.KeyValueWriter, pos []*arbutil.MessageIndex) error { +func (s *TransactionStreamer) setEspressoSubmittedPos(batch ethdb.KeyValueWriter, pos []arbutil.MessageIndex) error { // if pos is nil, delete the key if pos == nil { err := batch.Delete(espressoSubmittedPos) @@ -1460,7 +1502,7 @@ func (s *TransactionStreamer) setEspressoSubmittedPos(batch ethdb.KeyValueWriter return nil } -func (s *TransactionStreamer) setEspressoLastConfirmedPos(batch ethdb.KeyValueWriter, pos arbutil.MessageIndex) error { +func (s *TransactionStreamer) setEspressoLastConfirmedPos(batch ethdb.KeyValueWriter, pos *arbutil.MessageIndex) error { posBytes, err := rlp.EncodeToBytes(pos) if err != nil { return err @@ -1473,9 +1515,22 @@ func (s *TransactionStreamer) setEspressoLastConfirmedPos(batch ethdb.KeyValueWr return nil } -func (s *TransactionStreamer) setEspressoSubmittedHash(batch ethdb.KeyValueWriter, hash espressoTypes.TaggedBase64) error { +func (s *TransactionStreamer) setSkipVerifiactionPos(batch ethdb.KeyValueWriter, pos *arbutil.MessageIndex) error { + posBytes, err := rlp.EncodeToBytes(pos) + if err != nil { + return err + } + err = batch.Put(espressoSkipVerificationPos, posBytes) + if err != nil { + return err + + } + return nil +} + +func (s *TransactionStreamer) setEspressoSubmittedHash(batch ethdb.KeyValueWriter, hash *espressoTypes.TaggedBase64) error { // if hash is nil, delete the key - if hash.Value() == nil { + if hash == nil { err := batch.Delete(espressoSubmittedHash) return err } @@ -1492,7 +1547,7 @@ func (s *TransactionStreamer) setEspressoSubmittedHash(batch ethdb.KeyValueWrite return nil } -func (s *TransactionStreamer) setEspressoPendingTxnsPos(batch ethdb.KeyValueWriter, pos []*arbutil.MessageIndex) error { +func (s *TransactionStreamer) setEspressoPendingTxnsPos(batch ethdb.KeyValueWriter, pos []arbutil.MessageIndex) error { if pos == nil { err := batch.Delete(espressoPendingTxnsPositions) return err @@ -1512,7 +1567,7 @@ func (s *TransactionStreamer) setEspressoPendingTxnsPos(batch ethdb.KeyValueWrit func (s *TransactionStreamer) HasNotSubmitted(pos arbutil.MessageIndex) (bool, error) { submitted, err := s.getEspressoSubmittedPos() - if err != nil && !dbutil.IsErrNotFound(err) { + if err != nil { return false, err } @@ -1521,11 +1576,11 @@ func (s *TransactionStreamer) HasNotSubmitted(pos arbutil.MessageIndex) (bool, e } lastConfirmed, err := s.getLastConfirmedPos() - if err != nil && !dbutil.IsErrNotFound(err) { + if err != nil { return false, err } - if pos <= lastConfirmed { + if lastConfirmed != nil && pos <= *lastConfirmed { return false, nil } @@ -1534,7 +1589,7 @@ func (s *TransactionStreamer) HasNotSubmitted(pos arbutil.MessageIndex) (bool, e return false, err } - if len(pendingTxnsPos) > 0 && pos <= *pendingTxnsPos[len(pendingTxnsPos)-1] { + if len(pendingTxnsPos) > 0 && pos <= pendingTxnsPos[len(pendingTxnsPos)-1] { return false, nil } @@ -1544,16 +1599,15 @@ func (s *TransactionStreamer) HasNotSubmitted(pos arbutil.MessageIndex) (bool, e // Append a position to the pending queue. Please ensure this position is valid beforehand. func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIndex, batch ethdb.Batch) error { pendingTxnsPos, err := s.getEspressoPendingTxnsPos() - if err != nil && !dbutil.IsErrNotFound(err) { - log.Error("failed to get the pending txns position", "err", err) + if err != nil { return err } - if err != nil && dbutil.IsErrNotFound(err) { + if pendingTxnsPos == nil { // if the key doesn't exist, create a new array with the pos - pendingTxnsPos = []*arbutil.MessageIndex{&pos} + pendingTxnsPos = []arbutil.MessageIndex{pos} } else { - pendingTxnsPos = append(pendingTxnsPos, &pos) + pendingTxnsPos = append(pendingTxnsPos, pos) } err = s.setEspressoPendingTxnsPos(batch, pendingTxnsPos) if err != nil { @@ -1569,20 +1623,7 @@ func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIn return nil } -func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ignored struct{}) time.Duration { - - _, err := s.getEspressoSubmittedPos() - - if err != nil && !dbutil.IsErrNotFound(err) { - log.Warn("error getting submitted pos", "err", err) - return s.config().EspressoTxnsPollingInterval - } - - if err == nil { - if s.pollSubmittedTransactionForFinality(ctx) != time.Duration(0) { - return s.config().EspressoTxnsPollingInterval - } - } +func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) time.Duration { pendingTxnsPos, err := s.getEspressoPendingTxnsPos() if err != nil { @@ -1593,7 +1634,7 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig // get the message at the pending txn position msgs := []arbostypes.L1IncomingMessage{} for _, pos := range pendingTxnsPos { - msg, err := s.GetMessage(*pos) + msg, err := s.GetMessage(pos) if err != nil { log.Error("failed to get espresso submitted pos", "err", err) return s.config().EspressoTxnsPollingInterval @@ -1637,7 +1678,7 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig log.Error("failed to set the pending txns", "err", err) return s.config().EspressoTxnsPollingInterval } - err = s.setEspressoSubmittedHash(batch, *hash) + err = s.setEspressoSubmittedHash(batch, hash) if err != nil { log.Error("failed to set the submitted hash", "err", err) return s.config().EspressoTxnsPollingInterval @@ -1653,6 +1694,101 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig return s.config().EspressoTxnsPollingInterval } +func (s *TransactionStreamer) toggleEscapeHatch(ctx context.Context) error { + live, err := s.lightClientReader.IsHotShotLive(s.config().EspressoSwitchDelayThreshold) + if err != nil { + return err + } + // If hotshot is down, escape hatch is activated, the only thing is to check if hotshot is live again + if s.HotshotDown { + if live { + log.Info("HotShot is up, disabling the escape hatch") + s.HotshotDown = false + } + return nil + } + + // If hotshot is up, escape hatch is disabled + // - check if escape hatch should be activated + // - check if the submitted transaction should be skipped from espresso verification + if !live { + log.Warn("enabling the escape hatch, hotshot is down") + s.HotshotDown = true + } + + submittedHash, err := s.getEspressoSubmittedHash() + if err != nil { + return err + } + + if submittedHash == nil { + return nil + } + + // If a submitted transaction is waiting for being finalized, check if hotshot is live at + // the corresponding L1 height. + data, err := s.espressoClient.FetchTransactionByHash(ctx, submittedHash) + if err != nil { + return err + } + + header, err := s.espressoClient.FetchHeaderByHeight(ctx, data.BlockHeight) + if err != nil { + return err + } + + l1Height := header.Header.GetL1Head() + hotshotLive, err := s.lightClientReader.IsHotShotLiveAtHeight(l1Height, s.config().EspressoSwitchDelayThreshold) + if err != nil { + return err + } + if hotshotLive { + return nil + } + submitted, err := s.getEspressoSubmittedPos() + if err != nil { + return err + } + if len(submitted) == 0 { + return fmt.Errorf("submitted messages should not have the length of 0") + } + + last := submitted[len(submitted)-1] + + s.espressoTxnsStateInsertionMutex.Lock() + defer s.espressoTxnsStateInsertionMutex.Unlock() + + batch := s.db.NewBatch() + if s.UseEscapeHatch { + // If escape hatch is used, write down the allowed skip position + // to the database. Batch poster will read this and circumvent the espresso validation + // for certain messages + err = s.setEspressoSubmittedHash(batch, nil) + if err != nil { + return err + } + err = s.setEspressoSubmittedPos(batch, nil) + if err != nil { + return err + } + err = s.setEspressoPendingTxnsPos(batch, nil) + if err != nil { + return err + } + log.Warn("setting last skip verification position", "pos", last) + err = s.setSkipVerifiactionPos(batch, &last) + if err != nil { + return err + } + } + err = batch.Write() + if err != nil { + return err + } + + return nil +} + func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct{}) time.Duration { retryRate := s.config().EspressoTxnsPollingInterval * 50 config, err := s.exec.GetArbOSConfigAtHeight(0) // Pass 0 to get the ArbOS config at current block height. @@ -1664,13 +1800,36 @@ func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct log.Error("ArbOS Config is nil") return retryRate } - if config.ArbitrumChainParams.EnableEspresso { - return s.submitEspressoTransactions(ctx, ignored) + // TODO: `SovereignSequencerEnabled` should be removed as it is only the sovereign sequencer + // will use this function. + if config.ArbitrumChainParams.EnableEspresso && s.config().SovereignSequencerEnabled { + err := s.toggleEscapeHatch(ctx) + if err != nil { + log.Error("error checking escape hatch", "err", err) + return retryRate + } + canSubmit := s.pollSubmittedTransactionForFinality(ctx) + if canSubmit && s.shouldSubmitEspressoTransaction() { + return s.submitEspressoTransactions(ctx) + } + + return s.config().EspressoTxnsPollingInterval } else { return retryRate } } +func (s *TransactionStreamer) shouldSubmitEspressoTransaction() bool { + if !s.config().SovereignSequencerEnabled { + // Not using hotshot as finality layer + return false + } + if s.HotshotDown { + return false + } + return true +} + func (s *TransactionStreamer) Start(ctxIn context.Context) error { s.StopWaiter.Start(ctxIn, s) diff --git a/system_tests/espresso_arbos_test.go b/system_tests/espresso_arbos_test.go index 4655a79688..3ae8eda706 100644 --- a/system_tests/espresso_arbos_test.go +++ b/system_tests/espresso_arbos_test.go @@ -66,7 +66,7 @@ func TestEspressoArbOSConfig(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - builder, cleanup := createL1AndL2Node(ctx, t) + builder, cleanup := createL1AndL2Node(ctx, t, true) defer cleanup() err := waitForL1Node(ctx) diff --git a/system_tests/espresso_e2e_test.go b/system_tests/espresso_e2e_test.go index f48bcb2bf6..114b3d781c 100644 --- a/system_tests/espresso_e2e_test.go +++ b/system_tests/espresso_e2e_test.go @@ -4,16 +4,14 @@ import ( "context" "encoding/json" "math/big" - "os" "os/exec" "testing" "time" lightclient "github.com/EspressoSystems/espresso-sequencer-go/light-client" - lightclientmock "github.com/EspressoSystems/espresso-sequencer-go/light-client-mock" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/offchainlabs/nitro/arbutil" @@ -27,7 +25,6 @@ var workingDir = "./espresso-e2e" var lightClientAddress = "0x60571c8f4b52954a24a5e7306d435e951528d963" var hotShotUrl = "http://127.0.0.1:41000" -var delayThreshold uint64 = 10 var ( jitValidationPort = 54320 @@ -185,7 +182,7 @@ func TestEspressoE2E(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - builder, cleanup := createL1AndL2Node(ctx, t) + builder, cleanup := createL1AndL2Node(ctx, t, true) defer cleanup() err := waitForL1Node(ctx) @@ -279,66 +276,6 @@ func TestEspressoE2E(t *testing.T) { return balance2.Cmp(transferAmount) >= 0 }) Require(t, err) - - // Pause l1 height and verify that the escape hatch is working - checkEscapeHatch := os.Getenv("E2E_SKIP_ESCAPE_HATCH_TEST") - if checkEscapeHatch == "" { - log.Info("Checking the escape hatch") - // Start to check the escape hatch - address := common.HexToAddress(lightClientAddress) - - txOpts := builder.L1Info.GetDefaultTransactOpts("Faucet", ctx) - - // Freeze the l1 height - err := lightclientmock.FreezeL1Height(t, builder.L1.Client, address, &txOpts) - log.Info("waiting for light client to report hotshot is down") - Require(t, err) - err = waitForWith(ctx, 10*time.Minute, 1*time.Second, func() bool { - isLive, err := lightclientmock.IsHotShotLive(t, builder.L1.Client, address, uint64(delayThreshold)) - if err != nil { - return false - } - return !isLive - }) - Require(t, err) - log.Info("light client has reported that hotshot is down") - // Wait for the switch to be totally finished - currMsg, err := builder.L2.ConsensusNode.TxStreamer.GetMessageCount() - Require(t, err) - log.Info("waiting for message count", "currMsg", currMsg) - var validatedMsg arbutil.MessageIndex - err = waitForWith(ctx, 6*time.Minute, 60*time.Second, func() bool { - validatedCnt := builder.L2.ConsensusNode.BlockValidator.Validated(t) - log.Info("Validation status", "validatedCnt", validatedCnt, "msgCnt", msgCnt) - if validatedCnt >= currMsg { - validatedMsg = validatedCnt - return true - } - return false - }) - Require(t, err) - err = checkTransferTxOnL2(t, ctx, l2Node, "User12", l2Info) - Require(t, err) - err = checkTransferTxOnL2(t, ctx, l2Node, "User13", l2Info) - Require(t, err) - - err = waitForWith(ctx, 3*time.Minute, 20*time.Second, func() bool { - validated := builder.L2.ConsensusNode.BlockValidator.Validated(t) - return validated >= validatedMsg - }) - Require(t, err) - - // Unfreeze the l1 height - err = lightclientmock.UnfreezeL1Height(t, builder.L1.Client, address, &txOpts) - Require(t, err) - - // Check if the validated count is increasing - err = waitForWith(ctx, 3*time.Minute, 20*time.Second, func() bool { - validated := builder.L2.ConsensusNode.BlockValidator.Validated(t) - return validated >= validatedMsg+10 - }) - Require(t, err) - } } func checkTransferTxOnL2( diff --git a/system_tests/espresso_escape_hatch_test.go b/system_tests/espresso_escape_hatch_test.go new file mode 100644 index 0000000000..735bcaab77 --- /dev/null +++ b/system_tests/espresso_escape_hatch_test.go @@ -0,0 +1,143 @@ +package arbtest + +import ( + "context" + "encoding/json" + "os/exec" + "testing" + "time" + + lightclientmock "github.com/EspressoSystems/espresso-sequencer-go/light-client-mock" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/nitro/arbutil" +) + +func TestEspressoEscapeHatch(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Disabling the delayed sequencer helps up check the + // message count easily + builder, cleanup := createL1AndL2Node(ctx, t, false) + defer cleanup() + + err := waitForL1Node(ctx) + Require(t, err) + + cleanEspresso := runEspresso() + defer cleanEspresso() + + // wait for the builder + err = waitForEspressoNode(ctx) + Require(t, err) + + l2Node := builder.L2 + l2Info := builder.L2Info + + // wait for the latest hotshot block + err = waitFor(ctx, func() bool { + out, err := exec.Command("curl", "http://127.0.0.1:41000/status/block-height", "-L").Output() + if err != nil { + return false + } + h := 0 + err = json.Unmarshal(out, &h) + if err != nil { + return false + } + // Wait for the hotshot to generate some blocks to better simulate the real-world environment. + // Chosen based on intuition; no empirical data supports this value. + return h > 10 + }) + Require(t, err) + + address := common.HexToAddress(lightClientAddress) + txOpts := builder.L1Info.GetDefaultTransactOpts("Faucet", ctx) + + if builder.L2.ConsensusNode.TxStreamer.UseEscapeHatch { + t.Fatal("testing not using escape hatch first") + } + log.Info("Checking turning off the escape hatch") + + // Start to check the escape hatch + + // Freeze the l1 height + err = lightclientmock.FreezeL1Height(t, builder.L1.Client, address, &txOpts) + Require(t, err) + log.Info("waiting for light client to report hotshot is down") + err = waitForWith(ctx, 10*time.Minute, 10*time.Second, func() bool { + log.Info("waiting for hotshot down") + return builder.L2.ConsensusNode.TxStreamer.HotshotDown + }) + Require(t, err) + + log.Info("light client has reported that hotshot is down") + + // Wait for the switch to be totally finished + currMsg, err := builder.L2.ConsensusNode.TxStreamer.GetMessageCount() + Require(t, err) + + log.Info("waiting for message count", "currMsg", currMsg) + var validatedMsg arbutil.MessageIndex + err = waitForWith(ctx, 6*time.Minute, 60*time.Second, func() bool { + validatedCnt := builder.L2.ConsensusNode.BlockValidator.Validated(t) + log.Info("Validation status", "validatedCnt", validatedCnt, "currCnt", currMsg) + if validatedCnt >= currMsg { + validatedMsg = validatedCnt + return true + } + return false + }) + Require(t, err) + err = checkTransferTxOnL2(t, ctx, l2Node, "User12", l2Info) + Require(t, err) + err = checkTransferTxOnL2(t, ctx, l2Node, "User13", l2Info) + Require(t, err) + + time.Sleep(20 * time.Second) + validated := builder.L2.ConsensusNode.BlockValidator.Validated(t) + if validated > validatedMsg { + t.Fatal("Escape hatch is not used. Validated messages should not increase anymore") + } + + log.Info("setting hotshot back") + // Unfreeze the l1 height + err = lightclientmock.UnfreezeL1Height(t, builder.L1.Client, address, &txOpts) + Require(t, err) + + // Check if the validated count is increasing after hotshot goes back live + err = waitForWith(ctx, 3*time.Minute, 20*time.Second, func() bool { + validated := builder.L2.ConsensusNode.BlockValidator.Validated(t) + return validated > validatedMsg + }) + Require(t, err) + + log.Info("testing escape hatch") + // Modify it manually + builder.L2.ConsensusNode.TxStreamer.UseEscapeHatch = true + + err = lightclientmock.FreezeL1Height(t, builder.L1.Client, address, &txOpts) + Require(t, err) + + err = waitForWith(ctx, 10*time.Minute, 10*time.Second, func() bool { + log.Info("waiting for hotshot down") + return builder.L2.ConsensusNode.TxStreamer.HotshotDown + }) + Require(t, err) + + err = checkTransferTxOnL2(t, ctx, l2Node, "User14", l2Info) + Require(t, err) + err = checkTransferTxOnL2(t, ctx, l2Node, "User15", l2Info) + Require(t, err) + currMsg, err = builder.L2.ConsensusNode.TxStreamer.GetMessageCount() + Require(t, err) + // Escape hatch is on, so the validated count should keep increasing + err = waitForWith(ctx, 10*time.Minute, 10*time.Second, func() bool { + validated := builder.L2.ConsensusNode.BlockValidator.Validated(t) + return validated >= currMsg + }) + Require(t, err) + // TODO: Find a way to check if any hotshot transaction is submitted, + // then set the hotshot live again. +} diff --git a/system_tests/espresso_finality_node_test.go b/system_tests/espresso_finality_node_test.go index 947312a682..9749979a68 100644 --- a/system_tests/espresso_finality_node_test.go +++ b/system_tests/espresso_finality_node_test.go @@ -43,7 +43,7 @@ func TestEspressoFinalityNode(t *testing.T) { valNodeCleanup := createValidationNode(ctx, t, true) defer valNodeCleanup() - builder, cleanup := createL1AndL2Node(ctx, t) + builder, cleanup := createL1AndL2Node(ctx, t, true) defer cleanup() err := waitForL1Node(ctx) diff --git a/system_tests/espresso_sovereign_sequencer_test.go b/system_tests/espresso_sovereign_sequencer_test.go index 39b2106076..4eb61051fc 100644 --- a/system_tests/espresso_sovereign_sequencer_test.go +++ b/system_tests/espresso_sovereign_sequencer_test.go @@ -11,7 +11,11 @@ import ( "github.com/ethereum/go-ethereum/common" ) -func createL1AndL2Node(ctx context.Context, t *testing.T) (*NodeBuilder, func()) { +func createL1AndL2Node( + ctx context.Context, + t *testing.T, + delayedSequencer bool, +) (*NodeBuilder, func()) { builder := NewNodeBuilder(ctx).DefaultConfig(t, true) builder.useL1StackConfig = true // Do not overwrite the L1 stack config when building builder.l1StackConfig.HTTPPort = 8545 @@ -36,7 +40,7 @@ func createL1AndL2Node(ctx context.Context, t *testing.T) (*NodeBuilder, func()) builder.nodeConfig.BlockValidator.Enable = true builder.nodeConfig.BlockValidator.ValidationPoll = 2 * time.Second builder.nodeConfig.BlockValidator.ValidationServer.URL = fmt.Sprintf("ws://127.0.0.1:%d", arbValidationPort) - builder.nodeConfig.DelayedSequencer.Enable = true + builder.nodeConfig.DelayedSequencer.Enable = delayedSequencer builder.nodeConfig.DelayedSequencer.FinalizeDistance = 1 // sequencer config @@ -54,6 +58,7 @@ func createL1AndL2Node(ctx context.Context, t *testing.T) (*NodeBuilder, func()) builder.nodeConfig.TransactionStreamer.SovereignSequencerEnabled = true builder.nodeConfig.TransactionStreamer.EspressoNamespace = builder.chainConfig.ChainID.Uint64() builder.nodeConfig.TransactionStreamer.HotShotUrl = hotShotUrl + builder.nodeConfig.TransactionStreamer.EspressoSwitchDelayThreshold = 5 cleanup := builder.Build(t) @@ -72,7 +77,7 @@ func TestEspressoSovereignSequencer(t *testing.T) { valNodeCleanup := createValidationNode(ctx, t, true) defer valNodeCleanup() - builder, cleanup := createL1AndL2Node(ctx, t) + builder, cleanup := createL1AndL2Node(ctx, t, true) defer cleanup() err := waitForL1Node(ctx)