diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 080dfea391..db18b0c035 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -12,13 +12,10 @@ import ( "fmt" "math" "math/big" - "os" "strings" "sync/atomic" "time" - "github.com/ethereum/go-ethereum/crypto" - "github.com/andybalholm/brotli" "github.com/spf13/pflag" @@ -43,7 +40,6 @@ import ( "github.com/offchainlabs/nitro/arbnode/dataposter" "github.com/offchainlabs/nitro/arbnode/dataposter/storage" "github.com/offchainlabs/nitro/arbnode/redislock" - "github.com/offchainlabs/nitro/arbos" "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbstate" "github.com/offchainlabs/nitro/arbstate/daprovider" @@ -183,11 +179,10 @@ type BatchPosterConfig struct { // Espresso specific flags LightClientAddress string `koanf:"light-client-address"` HotShotUrl string `koanf:"hotshot-url"` - UserDataAttestationFile string `koanf:"user-data-attestation-file"` - QuoteFile string `koanf:"quote-file"` UseEscapeHatch bool `koanf:"use-escape-hatch"` EspressoTxnsPollingInterval time.Duration `koanf:"espresso-txns-polling-interval"` EspressoSwitchDelayThreshold uint64 `koanf:"espresso-switch-delay-threshold"` + EspressoMaxTransactionSize uint64 `koanf:"espresso-max-transaction-size"` EspressoTEEVerifierAddress string `koanf:"espresso-tee-verifier-address"` } @@ -245,8 +240,6 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Uint64(prefix+".gas-estimate-base-fee-multiple-bips", uint64(DefaultBatchPosterConfig.GasEstimateBaseFeeMultipleBips), "for gas estimation, use this multiple of the basefee (measured in basis points) as the max fee per gas") f.Duration(prefix+".reorg-resistance-margin", DefaultBatchPosterConfig.ReorgResistanceMargin, "do not post batch if its within this duration from layer 1 minimum bounds. Requires l1-block-bound option not be set to \"ignore\"") f.Bool(prefix+".check-batch-correctness", DefaultBatchPosterConfig.CheckBatchCorrectness, "setting this to true will run the batch against an inbox multiplexer and verifies that it produces the correct set of messages") - f.String(prefix+".user-data-attestation-file", DefaultBatchPosterConfig.UserDataAttestationFile, "specifies the file containing the user data attestation") - f.String(prefix+".quote-file", DefaultBatchPosterConfig.QuoteFile, "specifies the file containing the quote") f.Bool(prefix+".use-escape-hatch", DefaultBatchPosterConfig.UseEscapeHatch, "if true, batches will be posted without doing the espresso verification when hotshot is down. If false, wait for hotshot being up") f.Duration(prefix+".espresso-txns-polling-interval", DefaultBatchPosterConfig.EspressoTxnsPollingInterval, "interval between polling for transactions to be included in the block") f.Uint64(prefix+".espresso-switch-delay-threshold", DefaultBatchPosterConfig.EspressoSwitchDelayThreshold, "specifies the switch delay threshold used to determine hotshot liveness") @@ -254,6 +247,7 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) { redislock.AddConfigOptions(prefix+".redis-lock", f) dataposter.DataPosterConfigAddOptions(prefix+".data-poster", f, dataposter.DefaultDataPosterConfig) genericconf.WalletConfigAddOptions(prefix+".parent-chain-wallet", f, DefaultBatchPosterConfig.ParentChainWallet.Pathname) + f.Uint64(prefix+".espresso-max-transaction-size", DefaultBatchPosterConfig.EspressoMaxTransactionSize, "specifies the max size of a espresso transasction") } var DefaultBatchPosterConfig = BatchPosterConfig{ @@ -282,13 +276,12 @@ var DefaultBatchPosterConfig = BatchPosterConfig{ GasEstimateBaseFeeMultipleBips: arbmath.OneInUBips * 3 / 2, ReorgResistanceMargin: 10 * time.Minute, CheckBatchCorrectness: true, - UserDataAttestationFile: "", - QuoteFile: "", UseEscapeHatch: false, EspressoTxnsPollingInterval: time.Millisecond * 500, EspressoSwitchDelayThreshold: 350, LightClientAddress: "", HotShotUrl: "", + EspressoMaxTransactionSize: 900 * 1024, EspressoTEEVerifierAddress: "", } @@ -326,6 +319,7 @@ var TestBatchPosterConfig = BatchPosterConfig{ EspressoSwitchDelayThreshold: 10, LightClientAddress: "", HotShotUrl: "", + EspressoMaxTransactionSize: 900 * 1024, } type BatchPosterOpts struct { @@ -390,6 +384,7 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e opts.Streamer.UseEscapeHatch = opts.Config().UseEscapeHatch opts.Streamer.espressoTxnsPollingInterval = opts.Config().EspressoTxnsPollingInterval opts.Streamer.espressoSwitchDelayThreshold = opts.Config().EspressoSwitchDelayThreshold + opts.Streamer.espressoMaxTransactionSize = opts.Config().EspressoMaxTransactionSize opts.Streamer.espressoTEEVerifierAddress = common.HexToAddress(opts.Config().EspressoTEEVerifierAddress) } @@ -566,22 +561,11 @@ var EspressoFetchTransactionErr = errors.New("failed to fetch the espresso trans // Adds a block merkle proof to an Espresso justification, providing a proof that a set of transactions // hashes to some light client state root. -func (b *BatchPoster) checkEspressoValidation( - msg *arbostypes.MessageWithMetadata, -) error { +func (b *BatchPoster) checkEspressoValidation() error { if b.streamer.espressoClient == nil && b.streamer.lightClientReader == nil { // We are not using espresso mode since these haven't been set return nil } - // We only submit the user transactions to hotshot. Only those messages created by - // sequencer should wait for the finality - if msg.Message.Header.Kind != arbostypes.L1MessageType_L2Message { - return nil - } - kind := msg.Message.L2msg[0] - if kind != arbos.L2MessageKind_Batch && kind != arbos.L2MessageKind_SignedTx { - return nil - } lastConfirmed, err := b.streamer.getLastConfirmedPos() if err != nil { @@ -607,8 +591,6 @@ func (b *BatchPoster) checkEspressoValidation( log.Warn("skipped espresso verification due to hotshot failure", "pos", b.building.msgCount) return nil } - // TODO: if current position is greater than the `skip`, should set the - // the skip value to nil. This should contribute to better efficiency. } } @@ -1145,7 +1127,7 @@ func (b *BatchPoster) encodeAddBatch( return nil, nil, fmt.Errorf("failed to pack calldata without attestation quote: %w", err) } - attestationQuote, err := b.getAttestationQuote(calldata) + attestationQuote, err := b.streamer.getAttestationQuote(calldata) if err != nil { return nil, nil, fmt.Errorf("failed to get attestation quote: %w", err) } @@ -1176,41 +1158,6 @@ func (b *BatchPoster) encodeAddBatch( return fullCalldata, kzgBlobs, nil } -/** - * This function generates the attestation quote for the user data. - * The user data is hashed using keccak256 and then 32 bytes of padding is added to the hash. - * The hash is then written to a file specified in the config. (For SGX: /dev/attestation/user_report_data) - * The quote is then read from the file specified in the config. (For SGX: /dev/attestation/quote) - */ -func (b *BatchPoster) getAttestationQuote(userData []byte) ([]byte, error) { - if (b.config().UserDataAttestationFile == "") || (b.config().QuoteFile == "") { - return []byte{}, nil - } - - // keccak256 hash of userData - userDataHash := crypto.Keccak256(userData) - - // Add 32 bytes of padding to the user data hash - // because keccak256 hash is 32 bytes and sgx requires 64 bytes of user data - for i := 0; i < 32; i += 1 { - userDataHash = append(userDataHash, 0) - } - - // Write the message to "/dev/attestation/user_report_data" in SGX - err := os.WriteFile(b.config().UserDataAttestationFile, userDataHash, 0600) - if err != nil { - return []byte{}, fmt.Errorf("failed to create user report data file: %w", err) - } - - // Read the quote from "/dev/attestation/quote" in SGX - attestationQuote, err := os.ReadFile(b.config().QuoteFile) - if err != nil { - return []byte{}, fmt.Errorf("failed to read quote file: %w", err) - } - - return attestationQuote, nil -} - var ErrNormalGasEstimationFailed = errors.New("normal gas estimation failed") type estimateGasParams struct { @@ -1473,19 +1420,6 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) shouldSubmit := b.streamer.shouldSubmitEspressoTransaction() if !b.streamer.UseEscapeHatch || shouldSubmit { for p := b.building.msgCount; p < msgCount; p += 1 { - msg, err := b.streamer.GetMessage(p) - if err != nil { - log.Error("error getting message from streamer", "error", err) - break - } - // We only submit the user transactions to hotshot. - if msg.Message.Header.Kind != arbostypes.L1MessageType_L2Message { - continue - } - kind := msg.Message.L2msg[0] - if kind != arbos.L2MessageKind_Batch && kind != arbos.L2MessageKind_SignedTx { - continue - } err = b.submitEspressoTransactionPos(p) if err != nil { log.Error("error submitting position", "error", err, "pos", p) @@ -1524,7 +1458,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) break } - err = b.checkEspressoValidation(msg) + err = b.checkEspressoValidation() if err != nil { return false, fmt.Errorf("error checking espresso valdiation: %w", err) } diff --git a/arbnode/espresso_utils.go b/arbnode/espresso_utils.go new file mode 100644 index 0000000000..3b7f208d06 --- /dev/null +++ b/arbnode/espresso_utils.go @@ -0,0 +1,131 @@ +package arbnode + +import ( + "bytes" + "encoding/binary" + "errors" + + espressoTypes "github.com/EspressoSystems/espresso-sequencer-go/types" + "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/nitro/arbutil" +) + +const MAX_ATTESTATION_QUOTE_SIZE int = 4 * 1024 +const LEN_SIZE int = 8 +const INDEX_SIZE int = 8 + +func buildRawHotShotPayload( + msgPositions []arbutil.MessageIndex, + msgFetcher func(arbutil.MessageIndex) ([]byte, error), + maxSize uint64, +) ([]byte, int) { + + payload := []byte{} + msgCnt := 0 + + for _, p := range msgPositions { + msgBytes, err := msgFetcher(p) + if err != nil { + log.Warn("failed to fetch the message", "pos", p) + break + } + + sizeBuf := make([]byte, LEN_SIZE) + positionBuf := make([]byte, INDEX_SIZE) + + if len(payload)+len(sizeBuf)+len(msgBytes)+len(positionBuf)+MAX_ATTESTATION_QUOTE_SIZE > int(maxSize) { + break + } + binary.BigEndian.PutUint64(sizeBuf, uint64(len(msgBytes))) + binary.BigEndian.PutUint64(positionBuf, uint64(p)) + + // Add the submitted txn position and the size of the message along with the message + payload = append(payload, positionBuf...) + payload = append(payload, sizeBuf...) + payload = append(payload, msgBytes...) + msgCnt += 1 + } + return payload, msgCnt +} + +func signHotShotPayload( + unsigned []byte, + signer func([]byte) ([]byte, error), +) ([]byte, error) { + quote, err := signer(unsigned) + if err != nil { + return nil, err + } + + quoteSizeBuf := make([]byte, LEN_SIZE) + binary.BigEndian.PutUint64(quoteSizeBuf, uint64(len(quote))) + // Put the signature first. That would help easier parsing. + result := quoteSizeBuf + result = append(result, quote...) + result = append(result, unsigned...) + + return result, nil +} + +func validateIfPayloadIsInBlock(p []byte, payloads []espressoTypes.Bytes) bool { + validated := false + for _, payload := range payloads { + if bytes.Equal(p, payload) { + validated = true + break + } + } + return validated +} + +func ParseHotShotPayload(payload []byte) (signature []byte, indices []uint64, messages [][]byte, err error) { + if len(payload) < LEN_SIZE { + return nil, nil, nil, errors.New("payload too short to parse signature size") + } + + // Extract the signature size + signatureSize := binary.BigEndian.Uint64(payload[:LEN_SIZE]) + currentPos := LEN_SIZE + + if len(payload[currentPos:]) < int(signatureSize) { + return nil, nil, nil, errors.New("payload too short for signature") + } + + // Extract the signature + signature = payload[currentPos : currentPos+int(signatureSize)] + currentPos += int(signatureSize) + + indices = []uint64{} + messages = [][]byte{} + + // Parse messages + for { + if currentPos == len(payload) { + break + } + if len(payload[currentPos:]) < LEN_SIZE+INDEX_SIZE { + return nil, nil, nil, errors.New("remaining bytes") + } + + // Extract the index + index := binary.BigEndian.Uint64(payload[currentPos : currentPos+INDEX_SIZE]) + currentPos += INDEX_SIZE + + // Extract the message size + messageSize := binary.BigEndian.Uint64(payload[currentPos : currentPos+LEN_SIZE]) + currentPos += LEN_SIZE + + if len(payload[currentPos:]) < int(messageSize) { + return nil, nil, nil, errors.New("message size mismatch") + } + + // Extract the message + message := payload[currentPos : currentPos+int(messageSize)] + currentPos += int(messageSize) + + indices = append(indices, index) + messages = append(messages, message) + } + + return signature, indices, messages, nil +} diff --git a/arbnode/espresso_utils_test.go b/arbnode/espresso_utils_test.go new file mode 100644 index 0000000000..d91ca10e3b --- /dev/null +++ b/arbnode/espresso_utils_test.go @@ -0,0 +1,124 @@ +package arbnode + +import ( + "bytes" + "encoding/binary" + "fmt" + "testing" + + espressoTypes "github.com/EspressoSystems/espresso-sequencer-go/types" + "github.com/offchainlabs/nitro/arbutil" +) + +func mockMsgFetcher(index arbutil.MessageIndex) ([]byte, error) { + return []byte("message" + fmt.Sprint(index)), nil +} + +func TestParsePayload(t *testing.T) { + msgPositions := []arbutil.MessageIndex{1, 2, 10, 24, 100} + + rawPayload, cnt := buildRawHotShotPayload(msgPositions, mockMsgFetcher, 200*1024) + if cnt != len(msgPositions) { + t.Fatal("exceed transactions") + } + + mockSignature := []byte("fake_signature") + fakeSigner := func(payload []byte) ([]byte, error) { + return mockSignature, nil + } + signedPayload, err := signHotShotPayload(rawPayload, fakeSigner) + if err != nil { + t.Fatalf("failed to sign payload: %v", err) + } + + // Parse the signed payload + signature, indices, messages, err := ParseHotShotPayload(signedPayload) + if err != nil { + t.Fatalf("failed to parse payload: %v", err) + } + + // Validate parsed data + if !bytes.Equal(signature, mockSignature) { + t.Errorf("expected signature 'fake_signature', got %v", mockSignature) + } + + for i, index := range indices { + if arbutil.MessageIndex(index) != msgPositions[i] { + t.Errorf("expected index %d, got %d", msgPositions[i], index) + } + } + + expectedMessages := [][]byte{ + []byte("message1"), + []byte("message2"), + []byte("message10"), + []byte("message24"), + []byte("message100"), + } + for i, message := range messages { + if !bytes.Equal(message, expectedMessages[i]) { + t.Errorf("expected message %s, got %s", expectedMessages[i], message) + } + } +} + +func TestValidateIfPayloadIsInBlock(t *testing.T) { + msgPositions := []arbutil.MessageIndex{1, 2} + + rawPayload, _ := buildRawHotShotPayload(msgPositions, mockMsgFetcher, 200*1024) + fakeSigner := func(payload []byte) ([]byte, error) { + return []byte("fake_signature"), nil + } + signedPayload, err := signHotShotPayload(rawPayload, fakeSigner) + if err != nil { + t.Fatalf("failed to sign payload: %v", err) + } + + // Validate payload in a block + blockPayloads := []espressoTypes.Bytes{ + signedPayload, + []byte("other_payload"), + } + + if !validateIfPayloadIsInBlock(signedPayload, blockPayloads) { + t.Error("expected payload to be validated in block") + } + + if validateIfPayloadIsInBlock([]byte("invalid_payload"), blockPayloads) { + t.Error("did not expect invalid payload to be validated in block") + } +} + +func TestParsePayloadInvalidCases(t *testing.T) { + invalidPayloads := []struct { + description string + payload []byte + }{ + { + description: "Empty payload", + payload: []byte{}, + }, + { + description: "Message size exceeds remaining payload", + payload: func() []byte { + var payload []byte + sigSizeBuf := make([]byte, 8) + binary.BigEndian.PutUint64(sigSizeBuf, 0) + payload = append(payload, sigSizeBuf...) + msgSizeBuf := make([]byte, 8) + binary.BigEndian.PutUint64(msgSizeBuf, 100) + payload = append(payload, msgSizeBuf...) + return payload + }(), + }, + } + + for _, tc := range invalidPayloads { + t.Run(tc.description, func(t *testing.T) { + _, _, _, err := ParseHotShotPayload(tc.payload) + if err == nil { + t.Errorf("expected error for case '%s', but got none", tc.description) + } + }) + } +} diff --git a/arbnode/schema.go b/arbnode/schema.go index 3ce1a3e871..fc5308ee4e 100644 --- a/arbnode/schema.go +++ b/arbnode/schema.go @@ -19,6 +19,7 @@ var ( dbSchemaVersion []byte = []byte("_schemaVersion") // contains a uint64 representing the database schema version espressoSubmittedPos []byte = []byte("_espressoSubmittedPos") // contains the current message indices of the last submitted txns espressoSubmittedHash []byte = []byte("_espressoSubmittedHash") // contains the hash of the last submitted txn + espressoSubmittedPayload []byte = []byte("_espressoSubmittedPayload") // contains the payload of the last submitted espresso txn espressoPendingTxnsPositions []byte = []byte("_espressoPendingTxnsPositions") // contains the index of the pending txns that need to be submitted to espresso espressoLastConfirmedPos []byte = []byte("_espressoLastConfirmedPos") // contains the position of the last confirmed message espressoSkipVerificationPos []byte = []byte("_espressoSkipVerificationPos") // contains the position of the latest message that should skip the validation due to hotshot liveness failure diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 7ce33d47c5..70a9eee20c 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -10,6 +10,7 @@ import ( "encoding/json" "fmt" "math/big" + "os" "reflect" "sync" "sync/atomic" @@ -27,6 +28,7 @@ import ( "errors" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" @@ -83,6 +85,7 @@ type TransactionStreamer struct { lightClientReader lightclient.LightClientReaderInterface espressoTxnsPollingInterval time.Duration espressoSwitchDelayThreshold uint64 + espressoMaxTransactionSize uint64 // Public these fields for testing HotshotDown bool UseEscapeHatch bool @@ -93,6 +96,8 @@ type TransactionStreamerConfig struct { MaxBroadcasterQueueSize int `koanf:"max-broadcaster-queue-size"` MaxReorgResequenceDepth int64 `koanf:"max-reorg-resequence-depth" reload:"hot"` ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"` + UserDataAttestationFile string `koanf:"user-data-attestation-file"` + QuoteFile string `koanf:"quote-file"` } type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig @@ -101,6 +106,8 @@ var DefaultTransactionStreamerConfig = TransactionStreamerConfig{ MaxBroadcasterQueueSize: 50_000, MaxReorgResequenceDepth: 1024, ExecuteMessageLoopDelay: time.Millisecond * 100, + QuoteFile: "", + UserDataAttestationFile: "", } var TestTransactionStreamerConfig = TransactionStreamerConfig{ @@ -113,6 +120,8 @@ func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".max-broadcaster-queue-size", DefaultTransactionStreamerConfig.MaxBroadcasterQueueSize, "maximum cache of pending broadcaster messages") f.Int64(prefix+".max-reorg-resequence-depth", DefaultTransactionStreamerConfig.MaxReorgResequenceDepth, "maximum number of messages to attempt to resequence on reorg (0 = never resequence, -1 = always resequence)") f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages") + f.String(prefix+".user-data-attestation-file", DefaultTransactionStreamerConfig.UserDataAttestationFile, "specifies the file containing the user data attestation") + f.String(prefix+".quote-file", DefaultTransactionStreamerConfig.QuoteFile, "specifies the file containing the quote") } func NewTransactionStreamer( @@ -1270,40 +1279,7 @@ func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Co return fmt.Errorf("could not get the header (height: %d): %w", height, err) } - // Verify the namespace proof - resp, err := s.espressoClient.FetchTransactionsInBlock(ctx, height, s.chainConfig.ChainID.Uint64()) - if err != nil { - return fmt.Errorf("failed to fetch the transactions in block (height: %d): %w", height, err) - } - - msgs := []arbostypes.L1IncomingMessage{} - for _, p := range submittedTxnPos { - msg, err := s.GetMessage(p) - if err != nil { - return fmt.Errorf("failed to get the message in tx streamer (pos: %d): %w", p, err) - } - if msg.Message != nil { - msgs = append(msgs, *msg.Message) - } - } - - payload, length := buildHotShotPayload(&msgs) - if length != len(msgs) { - return errors.New("failed to rebuild the hotshot payload; the number of messages does not match the expected length") - } - - namespaceOk := espressocrypto.VerifyNamespace( - s.chainConfig.ChainID.Uint64(), - resp.Proof, - *header.Header.GetPayloadCommitment(), - *header.Header.GetNsTable(), - []espressoTypes.Bytes{payload}, - resp.VidCommon, - ) - if !namespaceOk { - return fmt.Errorf("error validating namespace proof (height: %d)", height) - } - + // Verify the merkle proof snapshot, err := s.lightClientReader.FetchMerkleRoot(height, nil) if err != nil { return fmt.Errorf("%w (height: %d): %w", EspressoFetchMerkleRootErr, height, err) @@ -1334,16 +1310,42 @@ func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Co return fmt.Errorf("error validating merkle proof (height: %d, snapshot height: %d)", height, snapshot.Height) } + // Verify the namespace proof + resp, err := s.espressoClient.FetchTransactionsInBlock(ctx, height, s.chainConfig.ChainID.Uint64()) + if err != nil { + return fmt.Errorf("failed to fetch the transactions in block (height: %d): %w", height, err) + } + + namespaceOk := espressocrypto.VerifyNamespace( + s.chainConfig.ChainID.Uint64(), + resp.Proof, + *header.Header.GetPayloadCommitment(), + *header.Header.GetNsTable(), + resp.Transactions, + resp.VidCommon, + ) + + if !namespaceOk { + return fmt.Errorf("error validating namespace proof (height: %d)", height) + } + + submittedPayload, err := s.getEspressoSubmittedPayload() + if err != nil { + return fmt.Errorf("submitted payload not found: %w", err) + } + + validated := validateIfPayloadIsInBlock(submittedPayload, resp.Transactions) + if !validated { + return fmt.Errorf("transactions fetched from HotShot doesn't contain the submitted payload") + } + // Validation completed. Update the database s.espressoTxnsStateInsertionMutex.Lock() defer s.espressoTxnsStateInsertionMutex.Unlock() batch := s.db.NewBatch() - if err := s.setEspressoSubmittedPos(batch, nil); err != nil { - return fmt.Errorf("failed to set the submitted pos to nil: %w", err) - } - if err := s.setEspressoSubmittedHash(batch, nil); err != nil { - return fmt.Errorf("failed to set the submitted hash to nil: %w", err) + if err := s.cleanEspressoSubmittedData(batch); err != nil { + return nil } lastConfirmedPos := submittedTxnPos[len(submittedTxnPos)-1] if err := s.setEspressoLastConfirmedPos(batch, &lastConfirmedPos); err != nil { @@ -1396,6 +1398,17 @@ func (s *TransactionStreamer) getEspressoSubmittedHash() (*espressoTypes.TaggedB return hashParsed, nil } +func (s *TransactionStreamer) getEspressoSubmittedPayload() ([]byte, error) { + bytes, err := s.db.Get(espressoSubmittedPayload) + if err != nil { + if dbutil.IsErrNotFound(err) { + return nil, nil + } + return nil, err + } + return bytes, nil +} + func (s *TransactionStreamer) getLastConfirmedPos() (*arbutil.MessageIndex, error) { lastConfirmedBytes, err := s.db.Get(espressoLastConfirmedPos) if err != nil { @@ -1477,7 +1490,34 @@ func (s *TransactionStreamer) setEspressoLastConfirmedPos(batch ethdb.KeyValueWr return nil } -func (s *TransactionStreamer) setSkipVerifiactionPos(batch ethdb.KeyValueWriter, pos *arbutil.MessageIndex) error { +func (s *TransactionStreamer) cleanEspressoSubmittedData(batch ethdb.Batch) error { + if err := s.setEspressoSubmittedPos(batch, nil); err != nil { + return fmt.Errorf("failed to set the submitted pos to nil: %w", err) + } + if err := s.setEspressoSubmittedPayload(batch, nil); err != nil { + return fmt.Errorf("failed to set the submitted pos to nil: %w", err) + } + if err := s.setEspressoSubmittedHash(batch, nil); err != nil { + return fmt.Errorf("failed to set the submitted hash to nil: %w", err) + } + return nil + +} + +func (s *TransactionStreamer) setEspressoSubmittedPayload(batch ethdb.KeyValueWriter, payload []byte) error { + if payload == nil { + err := batch.Delete(espressoSubmittedHash) + return err + } + err := batch.Put(espressoSubmittedPayload, payload) + if err != nil { + return err + + } + return nil +} + +func (s *TransactionStreamer) setSkipVerificationPos(batch ethdb.KeyValueWriter, pos *arbutil.MessageIndex) error { posBytes, err := rlp.EncodeToBytes(pos) if err != nil { return err @@ -1593,21 +1633,27 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) ti } if len(pendingTxnsPos) > 0 { - // get the message at the pending txn position - msgs := []arbostypes.L1IncomingMessage{} - for _, pos := range pendingTxnsPos { + fetcher := func(pos arbutil.MessageIndex) ([]byte, error) { msg, err := s.GetMessage(pos) if err != nil { - log.Error("failed to get espresso submitted pos", "err", err) - return s.espressoTxnsPollingInterval + return nil, err } - if msg.Message != nil { - msgs = append(msgs, *msg.Message) + b, err := rlp.EncodeToBytes(msg) + if err != nil { + return nil, err } + return b, nil } - payload, msgCnt := buildHotShotPayload(&msgs) + + payload, msgCnt := buildRawHotShotPayload(pendingTxnsPos, fetcher, s.espressoMaxTransactionSize) if msgCnt == 0 { - log.Error("failed to build the hotshot transaction: a large message has exceeded the size limit") + log.Error("failed to build the hotshot transaction: a large message has exceeded the size limit or failed to get a message from storage", "size", s.espressoMaxTransactionSize) + return s.espressoTxnsPollingInterval + } + + payload, err = signHotShotPayload(payload, s.getAttestationQuote) + if err != nil { + log.Error("failed to sign the hotshot payload", "err", err) return s.espressoTxnsPollingInterval } @@ -1645,6 +1691,11 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) ti log.Error("failed to set the submitted hash", "err", err) return s.espressoTxnsPollingInterval } + err = s.setEspressoSubmittedPayload(batch, payload) + if err != nil { + log.Error("failed to set the espresso payload", "err", err) + return s.espressoTxnsPollingInterval + } err = batch.Write() if err != nil { @@ -1656,7 +1707,7 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) ti return s.espressoTxnsPollingInterval } -func (s *TransactionStreamer) toggleEscapeHatch(ctx context.Context) error { +func (s *TransactionStreamer) checkEspressoLiveness(ctx context.Context) error { live, err := s.lightClientReader.IsHotShotLive(s.espressoSwitchDelayThreshold) if err != nil { return err @@ -1670,19 +1721,22 @@ func (s *TransactionStreamer) toggleEscapeHatch(ctx context.Context) error { return nil } - // If hotshot is up, escape hatch is disabled - // - check if escape hatch should be activated - // - check if the submitted transaction should be skipped from espresso verification + // If hotshot was previously up, now it is down if !live { log.Warn("enabling the escape hatch, hotshot is down") s.HotshotDown = true } + if !s.UseEscapeHatch { + return nil + } + submittedHash, err := s.getEspressoSubmittedHash() if err != nil { return err } + // No transaction is waiting for espresso finalization if submittedHash == nil { return nil } @@ -1705,6 +1759,7 @@ func (s *TransactionStreamer) toggleEscapeHatch(ctx context.Context) error { return err } if hotshotLive { + // This transaction will be still finalized return nil } submitted, err := s.getEspressoSubmittedPos() @@ -1721,27 +1776,17 @@ func (s *TransactionStreamer) toggleEscapeHatch(ctx context.Context) error { defer s.espressoTxnsStateInsertionMutex.Unlock() batch := s.db.NewBatch() - if s.UseEscapeHatch { - // If escape hatch is used, write down the allowed skip position - // to the database. Batch poster will read this and circumvent the espresso validation - // for certain messages - err = s.setEspressoSubmittedHash(batch, nil) - if err != nil { - return err - } - err = s.setEspressoSubmittedPos(batch, nil) - if err != nil { - return err - } - err = s.setEspressoPendingTxnsPos(batch, nil) - if err != nil { - return err - } - log.Warn("setting last skip verification position", "pos", last) - err = s.setSkipVerifiactionPos(batch, &last) - if err != nil { - return err - } + // If escape hatch is used, write down the allowed skip position + // to the database. Batch poster will read this and circumvent the espresso validation + // for certain messages + err = s.cleanEspressoSubmittedData(batch) + if err != nil { + return err + } + log.Warn("setting last skip verification position", "pos", last) + err = s.setSkipVerificationPos(batch, &last) + if err != nil { + return err } err = batch.Write() if err != nil { @@ -1765,7 +1810,7 @@ func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct retryRate := s.espressoTxnsPollingInterval * 50 enabledEspresso := s.espressoTEEVerifierAddress != common.Address{} if enabledEspresso { - err := s.toggleEscapeHatch(ctx) + err := s.checkEspressoLiveness(ctx) if err != nil { if ctx.Err() != nil { return 0 @@ -1780,7 +1825,7 @@ func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct return 0 } logLevel := getLogLevel(err) - logLevel("error polling finality", "err", err) + logLevel("error polling finality, will retry", "err", err) return retryRate } else { espressoMerkleProofEphemeralErrorHandler.Reset() @@ -1816,22 +1861,37 @@ func (s *TransactionStreamer) Start(ctxIn context.Context) error { return stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.executeMessages, s.newMessageNotifier) } -const ESPRESSO_TRANSACTION_SIZE_LIMIT int = 10 * 1024 +/** + * This function generates the attestation quote for the user data. + * The user data is hashed using keccak256 and then 32 bytes of padding is added to the hash. + * The hash is then written to a file specified in the config. (For SGX: /dev/attestation/user_report_data) + * The quote is then read from the file specified in the config. (For SGX: /dev/attestation/quote) + */ +func (t *TransactionStreamer) getAttestationQuote(userData []byte) ([]byte, error) { + if (t.config().UserDataAttestationFile == "") || (t.config().QuoteFile == "") { + return []byte{}, nil + } -func buildHotShotPayload(msgs *[]arbostypes.L1IncomingMessage) (espressoTypes.Bytes, int) { - payload := []byte{} - msgCnt := 0 + // keccak256 hash of userData + userDataHash := crypto.Keccak256(userData) - sizeBuf := make([]byte, 8) - for _, msg := range *msgs { - if len(payload) >= ESPRESSO_TRANSACTION_SIZE_LIMIT { - break - } - msgByte := msg.L2msg - binary.BigEndian.PutUint64(sizeBuf, uint64(len(msgByte))) - payload = append(payload, sizeBuf...) - payload = append(payload, msgByte...) - msgCnt += 1 + // Add 32 bytes of padding to the user data hash + // because keccak256 hash is 32 bytes and sgx requires 64 bytes of user data + for i := 0; i < 32; i += 1 { + userDataHash = append(userDataHash, 0) } - return payload, msgCnt + + // Write the message to "/dev/attestation/user_report_data" in SGX + err := os.WriteFile(t.config().UserDataAttestationFile, userDataHash, 0600) + if err != nil { + return []byte{}, fmt.Errorf("failed to create user report data file: %w", err) + } + + // Read the quote from "/dev/attestation/quote" in SGX + attestationQuote, err := os.ReadFile(t.config().QuoteFile) + if err != nil { + return []byte{}, fmt.Errorf("failed to read quote file: %w", err) + } + + return attestationQuote, nil }