From 1af1274768b7e51e61342da113caedab191abc98 Mon Sep 17 00:00:00 2001 From: Sneh Koul Date: Mon, 5 Aug 2024 15:05:56 -0400 Subject: [PATCH 01/11] Persistence storage for sovereign builder --- arbnode/schema.go | 11 +- arbnode/transaction_streamer.go | 200 ++++++++++++++++++++++++++++---- 2 files changed, 182 insertions(+), 29 deletions(-) diff --git a/arbnode/schema.go b/arbnode/schema.go index 2854b7e785..4010de20d0 100644 --- a/arbnode/schema.go +++ b/arbnode/schema.go @@ -12,10 +12,13 @@ var ( sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count - messageCountKey []byte = []byte("_messageCount") // contains the current message count - delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count - sequencerBatchCountKey []byte = []byte("_sequencerBatchCount") // contains the current sequencer message count - dbSchemaVersion []byte = []byte("_schemaVersion") // contains a uint64 representing the database schema version + messageCountKey []byte = []byte("_messageCount") // contains the current message count + delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count + sequencerBatchCountKey []byte = []byte("_sequencerBatchCount") // contains the current sequencer message count + dbSchemaVersion []byte = []byte("_schemaVersion") // contains a uint64 representing the database schema version + espressoSubmittedPos []byte = []byte("_espressoSubmittedPos") // contains the current message index of the last submitted txn + espressoSubmittedHash []byte = []byte("_espressoSubmittedHash") // contains the hash of the last submitted txn + espressoPendingTxnsPos []byte = []byte("_espressoPendingTxnsPos") // contains the index of the pending txns that need to be submitted to espresso ) const currentDbSchemaVersion uint64 = 1 diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 6f6e0bd437..48d9255007 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -77,9 +77,6 @@ type TransactionStreamer struct { espressoClient *espressoClient.Client pendingTxnsQueueMutex sync.Mutex // cannot be acquired while reorgMutex is held - pendingTxnsPos []arbutil.MessageIndex - submittedTxnPos *arbutil.MessageIndex - submittedTxHash *espressoTypes.TaggedBase64 } type TransactionStreamerConfig struct { @@ -1199,21 +1196,22 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc return s.config().ExecuteMessageLoopDelay } -func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIndex) { - s.pendingTxnsQueueMutex.Lock() - defer s.pendingTxnsQueueMutex.Unlock() - s.pendingTxnsPos = append(s.pendingTxnsPos, pos) -} - func (s *TransactionStreamer) PollSubmittedTransactionForFinality(ctx context.Context) time.Duration { - - data, err := s.espressoClient.FetchTransactionByHash(ctx, s.submittedTxHash) + submittedTxnPos, err := s.getEspressoSubmittedPos() + if err != nil { + return s.config().EspressoTxnsPollingInterval + } + submittedTxHash, err := s.getEspressoSubmittedHash() if err != nil { - log.Error("failed to fetch the transaction hash", "err", err, "pos", s.submittedTxnPos) + return s.config().EspressoTxnsPollingInterval + } + data, err := s.espressoClient.FetchTransactionByHash(ctx, submittedTxHash) + if err != nil { + log.Error("failed to fetch the transaction hash", "err", err, "pos", submittedTxnPos) return s.config().EspressoTxnsPollingInterval } // get the message at the submitted txn position - msg, err := s.getMessageWithMetadataAndBlockHash(*s.submittedTxnPos) + msg, err := s.getMessageWithMetadataAndBlockHash(submittedTxnPos) if err != nil { log.Error("failed to get espresso message", "err", err) return s.config().EspressoTxnsPollingInterval @@ -1249,31 +1247,164 @@ func (s *TransactionStreamer) PollSubmittedTransactionForFinality(ctx context.Co } msg.MessageWithMeta.Message = &newMsg batch := s.db.NewBatch() - err = s.writeMessage(*s.submittedTxnPos, *msg, batch) + err = s.writeMessage(submittedTxnPos, *msg, batch) + if err != nil { + return s.config().EspressoTxnsPollingInterval + } + err = s.setEspressoSubmittedPos(batch, nil) if err != nil { return s.config().EspressoTxnsPollingInterval } + err = s.setEspressoSubmittedHash(batch, nil) + if err != nil { + return s.config().EspressoTxnsPollingInterval + } + err = batch.Write() if err != nil { return s.config().EspressoTxnsPollingInterval } - s.submittedTxnPos = nil - s.submittedTxHash = nil return time.Duration(0) } +func (s *TransactionStreamer) getEspressoSubmittedPos() (arbutil.MessageIndex, error) { + + posBytes, err := s.db.Get(espressoSubmittedPos) + if err != nil { + return 0, err + } + + var pos uint64 + err = rlp.DecodeBytes(posBytes, &pos) + + if err != nil { + return 0, err + } + + return arbutil.MessageIndex(pos), nil +} + +func (s *TransactionStreamer) getEspressoSubmittedHash() (*espressoTypes.TaggedBase64, error) { + posBytes, err := s.db.Get(espressoSubmittedHash) + if err != nil { + return nil, err + } + var hash espressoTypes.TaggedBase64 + err = rlp.DecodeBytes(posBytes, &hash) + if err != nil { + return nil, err + } + return &hash, nil +} + +func (s *TransactionStreamer) getEspressoPendingTxnsPos() (*[]arbutil.MessageIndex, error) { + pendingTxnsBytes, err := s.db.Get(espressoPendingTxnsPos) + if err != nil { + return nil, err + } + var pendingTxnsPos []arbutil.MessageIndex + err = rlp.DecodeBytes(pendingTxnsBytes, &pendingTxnsPos) + if err != nil { + return nil, err + } + return &pendingTxnsPos, nil +} + +func (s *TransactionStreamer) setEspressoSubmittedPos(batch ethdb.KeyValueWriter, pos *arbutil.MessageIndex) error { + // if pos is nil, delete the key + if pos == nil { + err := batch.Delete(espressoSubmittedPos) + return err + } + + posBytes, err := rlp.EncodeToBytes(pos) + if err != nil { + return err + } + err = batch.Put(espressoSubmittedPos, posBytes) + if err != nil { + return err + } + + return nil +} + +func (s *TransactionStreamer) setEspressoSubmittedHash(batch ethdb.KeyValueWriter, hash *espressoTypes.TaggedBase64) error { + // if hash is nil, delete the key + if hash == nil { + err := batch.Delete(espressoSubmittedHash) + return err + } + + hashBytes, err := rlp.EncodeToBytes(hash) + if err != nil { + return err + } + err = batch.Put(espressoSubmittedHash, hashBytes) + if err != nil { + return err + } + + return nil +} + +func (s *TransactionStreamer) setEspressoPendingTxnsPos(batch ethdb.KeyValueWriter, pos *[]arbutil.MessageIndex) error { + if pos == nil { + err := batch.Delete(espressoPendingTxnsPos) + return err + } + + posBytes, err := rlp.EncodeToBytes(pos) + if err != nil { + return err + } + err = batch.Put(espressoPendingTxnsPos, posBytes) + if err != nil { + return err + + } + return nil +} + +func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIndex) error { + pendingTxnsPos, err := s.getEspressoPendingTxnsPos() + if err != nil { + log.Error("failed to get the pending txns", "err", err) + return err + + } + *pendingTxnsPos = append(*pendingTxnsPos, pos) + err = s.setEspressoPendingTxnsPos(s.db.NewBatch(), pendingTxnsPos) + if err != nil { + log.Error("failed to set the pending txns", "err", err) + return err + } + + return nil +} + func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ignored struct{}) time.Duration { - if s.submittedTxnPos != nil { - if s.PollSubmittedTransactionForFinality(ctx) != time.Duration(0) { - return s.config().EspressoTxnsPollingInterval - } + + _, err := s.getEspressoSubmittedPos() + if err != nil { + log.Error("failed to get espresso submitted pos", "err", err) + return s.config().EspressoTxnsPollingInterval + } + + if s.PollSubmittedTransactionForFinality(ctx) != time.Duration(0) { + return s.config().EspressoTxnsPollingInterval } s.pendingTxnsQueueMutex.Lock() defer s.pendingTxnsQueueMutex.Unlock() - if s.submittedTxnPos == nil && len(s.pendingTxnsPos) > 0 { + pendingTxnsPos, err := s.getEspressoPendingTxnsPos() + if err != nil || pendingTxnsPos == nil { + log.Error("failed to get pending txns", "err", err) + return s.config().EspressoTxnsPollingInterval + } + if len(*pendingTxnsPos) > 0 { // get the message at the pending txn position - msg, err := s.GetMessage(s.pendingTxnsPos[0]) + msg, err := s.GetMessage((*pendingTxnsPos)[0]) if err != nil { log.Error("failed to get espresso submitted pos", "err", err) return s.config().EspressoTxnsPollingInterval @@ -1300,10 +1431,29 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig log.Error("failed to submit transaction to espresso", "err", err) return s.config().EspressoTxnsPollingInterval } - s.submittedTxnPos = &s.pendingTxnsPos[0] - s.pendingTxnsPos = s.pendingTxnsPos[1:] - s.submittedTxHash = hash + batch := s.db.NewBatch() + err = s.setEspressoSubmittedPos(batch, &(*pendingTxnsPos)[0]) + if err != nil { + log.Error("failed to set the submitted txn pos", "err", err) + return s.config().EspressoTxnsPollingInterval + } + *pendingTxnsPos = (*pendingTxnsPos)[1:] + err = s.setEspressoPendingTxnsPos(batch, pendingTxnsPos) + if err != nil { + log.Error("failed to set the pending txns", "err", err) + return s.config().EspressoTxnsPollingInterval + } + err = s.setEspressoSubmittedHash(batch, hash) + if err != nil { + log.Error("failed to set the submitted hash", "err", err) + return s.config().EspressoTxnsPollingInterval + } + err = batch.Write() + if err != nil { + return s.config().EspressoTxnsPollingInterval + } } + return s.config().EspressoTxnsPollingInterval } From 961bb478fa7dfbb06affab9d194bd68717922865 Mon Sep 17 00:00:00 2001 From: Sneh Koul Date: Mon, 5 Aug 2024 16:03:50 -0400 Subject: [PATCH 02/11] add unit test --- arbnode/schema.go | 14 ++--- arbnode/transaction_streamer.go | 56 +++++++++++-------- .../espresso_sovereign_sequencer_test.go | 37 ++++++++++++ 3 files changed, 76 insertions(+), 31 deletions(-) diff --git a/arbnode/schema.go b/arbnode/schema.go index 4010de20d0..9099c1f022 100644 --- a/arbnode/schema.go +++ b/arbnode/schema.go @@ -12,13 +12,13 @@ var ( sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count - messageCountKey []byte = []byte("_messageCount") // contains the current message count - delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count - sequencerBatchCountKey []byte = []byte("_sequencerBatchCount") // contains the current sequencer message count - dbSchemaVersion []byte = []byte("_schemaVersion") // contains a uint64 representing the database schema version - espressoSubmittedPos []byte = []byte("_espressoSubmittedPos") // contains the current message index of the last submitted txn - espressoSubmittedHash []byte = []byte("_espressoSubmittedHash") // contains the hash of the last submitted txn - espressoPendingTxnsPos []byte = []byte("_espressoPendingTxnsPos") // contains the index of the pending txns that need to be submitted to espresso + messageCountKey []byte = []byte("_messageCount") // contains the current message count + delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count + sequencerBatchCountKey []byte = []byte("_sequencerBatchCount") // contains the current sequencer message count + dbSchemaVersion []byte = []byte("_schemaVersion") // contains a uint64 representing the database schema version + espressoSubmittedPos []byte = []byte("_espressoSubmittedPos") // contains the current message index of the last submitted txn + espressoSubmittedHash []byte = []byte("_espressoSubmittedHash") // contains the hash of the last submitted txn + espressoPendingTxnsPositions []byte = []byte("_espressoPendingTxnsPositions") // contains the index of the pending txns that need to be submitted to espresso ) const currentDbSchemaVersion uint64 = 1 diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 48d9255007..961ae9ece1 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -75,8 +75,6 @@ type TransactionStreamer struct { inboxReader *InboxReader delayedBridge *DelayedBridge espressoClient *espressoClient.Client - - pendingTxnsQueueMutex sync.Mutex // cannot be acquired while reorgMutex is held } type TransactionStreamerConfig struct { @@ -1022,7 +1020,10 @@ func (s *TransactionStreamer) WriteMessageFromSequencer( } s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) - s.SubmitEspressoTransactionPos(pos) + err = s.SubmitEspressoTransactionPos(pos) + if err != nil { + return err + } return nil } @@ -1199,21 +1200,23 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc func (s *TransactionStreamer) PollSubmittedTransactionForFinality(ctx context.Context) time.Duration { submittedTxnPos, err := s.getEspressoSubmittedPos() if err != nil { + log.Warn("submitted pos not found", "err", err) return s.config().EspressoTxnsPollingInterval } submittedTxHash, err := s.getEspressoSubmittedHash() if err != nil { + log.Warn("submitted hash not found", "err", err) return s.config().EspressoTxnsPollingInterval } data, err := s.espressoClient.FetchTransactionByHash(ctx, submittedTxHash) if err != nil { - log.Error("failed to fetch the transaction hash", "err", err, "pos", submittedTxnPos) + log.Error("failed to fetch the submitted transaction hash", "err", err, "pos", submittedTxnPos) return s.config().EspressoTxnsPollingInterval } // get the message at the submitted txn position msg, err := s.getMessageWithMetadataAndBlockHash(submittedTxnPos) if err != nil { - log.Error("failed to get espresso message", "err", err) + log.Error("failed to get espresso message at submitted txn pos", "err", err) return s.config().EspressoTxnsPollingInterval } // parse the message to get the transaction bytes and the justification @@ -1240,23 +1243,28 @@ func (s *TransactionStreamer) PollSubmittedTransactionForFinality(ctx context.Co jst.Header = espressoHeader jst.Proof = &resp.Proof jst.VidCommon = &resp.VidCommon + // create a new message with the header and the txn and the updated block justification newMsg, err := arbos.MessageFromEspressoSovereignTx(txns[0], jst, msg.MessageWithMeta.Message.Header) if err != nil { + log.Error("failed to parse espresso message", "err", err) return s.config().EspressoTxnsPollingInterval } msg.MessageWithMeta.Message = &newMsg batch := s.db.NewBatch() err = s.writeMessage(submittedTxnPos, *msg, batch) if err != nil { + log.Warn("failed to write the submitted txn pos to db ", "err", err) return s.config().EspressoTxnsPollingInterval } err = s.setEspressoSubmittedPos(batch, nil) if err != nil { + log.Warn("failed to set the submitted pos to nil", "err", err) return s.config().EspressoTxnsPollingInterval } err = s.setEspressoSubmittedHash(batch, nil) if err != nil { + log.Warn("failed to set the submitted hash to nil", "err", err) return s.config().EspressoTxnsPollingInterval } @@ -1274,7 +1282,7 @@ func (s *TransactionStreamer) getEspressoSubmittedPos() (arbutil.MessageIndex, e return 0, err } - var pos uint64 + var pos arbutil.MessageIndex err = rlp.DecodeBytes(posBytes, &pos) if err != nil { @@ -1297,17 +1305,17 @@ func (s *TransactionStreamer) getEspressoSubmittedHash() (*espressoTypes.TaggedB return &hash, nil } -func (s *TransactionStreamer) getEspressoPendingTxnsPos() (*[]arbutil.MessageIndex, error) { - pendingTxnsBytes, err := s.db.Get(espressoPendingTxnsPos) +func (s *TransactionStreamer) getEspressoPendingTxnsPos() ([]*arbutil.MessageIndex, error) { + pendingTxnsBytes, err := s.db.Get(espressoPendingTxnsPositions) if err != nil { return nil, err } - var pendingTxnsPos []arbutil.MessageIndex + var pendingTxnsPos []*arbutil.MessageIndex err = rlp.DecodeBytes(pendingTxnsBytes, &pendingTxnsPos) if err != nil { return nil, err } - return &pendingTxnsPos, nil + return pendingTxnsPos, nil } func (s *TransactionStreamer) setEspressoSubmittedPos(batch ethdb.KeyValueWriter, pos *arbutil.MessageIndex) error { @@ -1348,9 +1356,9 @@ func (s *TransactionStreamer) setEspressoSubmittedHash(batch ethdb.KeyValueWrite return nil } -func (s *TransactionStreamer) setEspressoPendingTxnsPos(batch ethdb.KeyValueWriter, pos *[]arbutil.MessageIndex) error { +func (s *TransactionStreamer) setEspressoPendingTxnsPos(batch ethdb.KeyValueWriter, pos []*arbutil.MessageIndex) error { if pos == nil { - err := batch.Delete(espressoPendingTxnsPos) + err := batch.Delete(espressoPendingTxnsPositions) return err } @@ -1358,7 +1366,7 @@ func (s *TransactionStreamer) setEspressoPendingTxnsPos(batch ethdb.KeyValueWrit if err != nil { return err } - err = batch.Put(espressoPendingTxnsPos, posBytes) + err = batch.Put(espressoPendingTxnsPositions, posBytes) if err != nil { return err @@ -1373,7 +1381,7 @@ func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIn return err } - *pendingTxnsPos = append(*pendingTxnsPos, pos) + pendingTxnsPos = append(pendingTxnsPos, &pos) err = s.setEspressoPendingTxnsPos(s.db.NewBatch(), pendingTxnsPos) if err != nil { log.Error("failed to set the pending txns", "err", err) @@ -1387,7 +1395,7 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig _, err := s.getEspressoSubmittedPos() if err != nil { - log.Error("failed to get espresso submitted pos", "err", err) + log.Warn("submitted pos not found", "err", err) return s.config().EspressoTxnsPollingInterval } @@ -1395,16 +1403,15 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig return s.config().EspressoTxnsPollingInterval } - s.pendingTxnsQueueMutex.Lock() - defer s.pendingTxnsQueueMutex.Unlock() pendingTxnsPos, err := s.getEspressoPendingTxnsPos() - if err != nil || pendingTxnsPos == nil { - log.Error("failed to get pending txns", "err", err) + if err != nil { + log.Warn("failed to get pending txns", "err", err) return s.config().EspressoTxnsPollingInterval } - if len(*pendingTxnsPos) > 0 { + + if len(pendingTxnsPos) > 0 { // get the message at the pending txn position - msg, err := s.GetMessage((*pendingTxnsPos)[0]) + msg, err := s.GetMessage(*pendingTxnsPos[0]) if err != nil { log.Error("failed to get espresso submitted pos", "err", err) return s.config().EspressoTxnsPollingInterval @@ -1420,7 +1427,7 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig Namespace: s.config().EspressoNamespace, } - log.Info("Submitting transaction to espresso using sovereign sequencer", "tx", espressoTx) + log.Info("submitting transaction to espresso using sovereign sequencer", "tx", espressoTx) hash, err := s.espressoClient.SubmitTransaction(ctx, espressoTypes.Transaction{ Payload: bytes[0], @@ -1431,13 +1438,14 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig log.Error("failed to submit transaction to espresso", "err", err) return s.config().EspressoTxnsPollingInterval } + batch := s.db.NewBatch() - err = s.setEspressoSubmittedPos(batch, &(*pendingTxnsPos)[0]) + err = s.setEspressoSubmittedPos(batch, pendingTxnsPos[0]) if err != nil { log.Error("failed to set the submitted txn pos", "err", err) return s.config().EspressoTxnsPollingInterval } - *pendingTxnsPos = (*pendingTxnsPos)[1:] + pendingTxnsPos = pendingTxnsPos[1:] err = s.setEspressoPendingTxnsPos(batch, pendingTxnsPos) if err != nil { log.Error("failed to set the pending txns", "err", err) diff --git a/system_tests/espresso_sovereign_sequencer_test.go b/system_tests/espresso_sovereign_sequencer_test.go index 205bc92f6b..8b38b4a26d 100644 --- a/system_tests/espresso_sovereign_sequencer_test.go +++ b/system_tests/espresso_sovereign_sequencer_test.go @@ -91,3 +91,40 @@ func TestSovereignSequencer(t *testing.T) { }) Require(t, err) } + +func TestSovereignSequencerSendMultipleTxns(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + valNodeCleanup := createValidationNode(ctx, t, true) + defer valNodeCleanup() + + l2Node, l2Info, cleanup := createL1AndL2Node(ctx, t) + defer cleanup() + + err := waitForL1Node(t, ctx) + Require(t, err) + + cleanEspresso := runEspresso(t, ctx) + defer cleanEspresso() + + // wait for the builder + err = waitForEspressoNode(t, ctx) + Require(t, err) + + err = checkTransferTxOnL2(t, ctx, l2Node, "User14", l2Info) + Require(t, err) + err = checkTransferTxOnL2(t, ctx, l2Node, "User15", l2Info) + Require(t, err) + err = checkTransferTxOnL2(t, ctx, l2Node, "User16", l2Info) + Require(t, err) + + msgCnt, err := l2Node.ConsensusNode.TxStreamer.GetMessageCount() + Require(t, err) + + err = waitForWith(t, ctx, 6*time.Minute, 60*time.Second, func() bool { + validatedCnt := l2Node.ConsensusNode.BlockValidator.Validated(t) + return validatedCnt == msgCnt + }) + Require(t, err) +} From 0d68de8cceb40a38b7d9c45dc7e106a9dd0ae75f Mon Sep 17 00:00:00 2001 From: Sneh Koul Date: Mon, 5 Aug 2024 16:47:43 -0400 Subject: [PATCH 03/11] add unit test to ci and add mutex --- .github/workflows/espresso-e2e.yml | 6 ++++++ arbnode/transaction_streamer.go | 15 +++++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/.github/workflows/espresso-e2e.yml b/.github/workflows/espresso-e2e.yml index 8a8c8b930d..2e03e65198 100644 --- a/.github/workflows/espresso-e2e.yml +++ b/.github/workflows/espresso-e2e.yml @@ -123,6 +123,7 @@ jobs: if: steps.cache-cbrotli.outputs.cache-hit != 'true' run: ./scripts/build-brotli.sh -w -d + - name: Build run: make build build-replay-env -j @@ -140,3 +141,8 @@ jobs: run: | packages=`go list ./... | grep system_tests` gotestsum --format short-verbose --packages="$packages" --rerun-fails=1 -- -v -timeout 15m -p 1 ./... -run 'TestSovereignSequencer' + + - name: Run sovereign sequencer test with multiple transactions + run: | + packages=`go list ./... | grep system_tests` + gotestsum --format short-verbose --packages="$packages" --rerun-fails=1 -- -v -timeout 15m -p 1 ./... -run 'TestSovereignSequencerMultiTx' \ No newline at end of file diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 961ae9ece1..6fff521e4c 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -58,8 +58,9 @@ type TransactionStreamer struct { config TransactionStreamerConfigFetcher snapSyncConfig *SnapSyncConfig - insertionMutex sync.Mutex // cannot be acquired while reorgMutex is held - reorgMutex sync.RWMutex + insertionMutex sync.Mutex // cannot be acquired while reorgMutex is held + reorgMutex sync.RWMutex + espressoTxnsStateInsertionMutex sync.Mutex newMessageNotifier chan struct{} newSovereignTxNotifier chan struct{} @@ -1020,6 +1021,8 @@ func (s *TransactionStreamer) WriteMessageFromSequencer( } s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) + s.espressoTxnsStateInsertionMutex.Lock() + defer s.espressoTxnsStateInsertionMutex.Unlock() err = s.SubmitEspressoTransactionPos(pos) if err != nil { return err @@ -1251,6 +1254,10 @@ func (s *TransactionStreamer) PollSubmittedTransactionForFinality(ctx context.Co return s.config().EspressoTxnsPollingInterval } msg.MessageWithMeta.Message = &newMsg + + s.espressoTxnsStateInsertionMutex.Lock() + defer s.espressoTxnsStateInsertionMutex.Unlock() + batch := s.db.NewBatch() err = s.writeMessage(submittedTxnPos, *msg, batch) if err != nil { @@ -1375,6 +1382,7 @@ func (s *TransactionStreamer) setEspressoPendingTxnsPos(batch ethdb.KeyValueWrit } func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIndex) error { + pendingTxnsPos, err := s.getEspressoPendingTxnsPos() if err != nil { log.Error("failed to get the pending txns", "err", err) @@ -1439,6 +1447,9 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig return s.config().EspressoTxnsPollingInterval } + s.espressoTxnsStateInsertionMutex.Lock() + defer s.espressoTxnsStateInsertionMutex.Unlock() + batch := s.db.NewBatch() err = s.setEspressoSubmittedPos(batch, pendingTxnsPos[0]) if err != nil { From 491e64afd83a38745cc1df9e0a51fc69229e55a0 Mon Sep 17 00:00:00 2001 From: ImJeremyHe Date: Mon, 5 Aug 2024 10:37:59 +0800 Subject: [PATCH 04/11] Optional header in jst --- arbnode/batch_poster.go | 3 +-- arbnode/transaction_streamer.go | 2 +- arbos/arbostypes/incomingmessage.go | 2 +- arbos/block_processor.go | 2 +- arbos/parse_l2_test.go | 2 +- execution/gethexec/espresso_sequencer.go | 2 +- execution/gethexec/executionengine.go | 6 ++---- 7 files changed, 8 insertions(+), 11 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 62546dda71..3d03349f74 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -502,8 +502,7 @@ func (b *BatchPoster) addEspressoBlockMerkleProof( return err } - if jst.Header.Height == 0 { - // This means the header in the jst is still the dummy header. + if jst.Header == nil { return fmt.Errorf("this msg has not been included in hotshot %v", jst.Header.Height) } diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 6fff521e4c..2d0d91ec47 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -1243,7 +1243,7 @@ func (s *TransactionStreamer) PollSubmittedTransactionForFinality(ctx context.Co } // Filling in the block justification with the header - jst.Header = espressoHeader + jst.Header = &espressoHeader jst.Proof = &resp.Proof jst.VidCommon = &resp.VidCommon diff --git a/arbos/arbostypes/incomingmessage.go b/arbos/arbostypes/incomingmessage.go index 84cee005de..a4bc455e01 100644 --- a/arbos/arbostypes/incomingmessage.go +++ b/arbos/arbostypes/incomingmessage.go @@ -43,7 +43,7 @@ type BlockMerkleJustification struct { } type EspressoBlockJustification struct { - Header espressoTypes.Header + Header *espressoTypes.Header Proof *espressoTypes.NamespaceProof VidCommon *espressoTypes.VidCommon BlockMerkleJustification *BlockMerkleJustification diff --git a/arbos/block_processor.go b/arbos/block_processor.go index 4b80a91861..b9ac1cbfe0 100644 --- a/arbos/block_processor.go +++ b/arbos/block_processor.go @@ -188,7 +188,7 @@ func ProduceBlock( if err != nil { return nil, nil, err } - espressoHeader = &jst.Header + espressoHeader = jst.Header } } diff --git a/arbos/parse_l2_test.go b/arbos/parse_l2_test.go index b649f2a1f8..3c3069275e 100644 --- a/arbos/parse_l2_test.go +++ b/arbos/parse_l2_test.go @@ -33,7 +33,7 @@ func TestEspressoParsing(t *testing.T) { root, err := tagged_base64.New("root", []byte{4, 5, 6}) Require(t, err) expectJst := &arbostypes.EspressoBlockJustification{ - Header: espressoTypes.Header{ + Header: &espressoTypes.Header{ L1Head: 1, ChainConfig: mockChainConfig, Timestamp: 2, diff --git a/execution/gethexec/espresso_sequencer.go b/execution/gethexec/espresso_sequencer.go index 71902aa090..f28f7380f1 100644 --- a/execution/gethexec/espresso_sequencer.go +++ b/execution/gethexec/espresso_sequencer.go @@ -92,7 +92,7 @@ func (s *EspressoSequencer) createBlock(ctx context.Context) (returnValue bool) } jst := &arbostypes.EspressoBlockJustification{ - Header: header, + Header: &header, VidCommon: &arbTxns.VidCommon, Proof: &arbTxns.Proof, } diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 0e08da9137..623aa5225b 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -289,9 +289,7 @@ func MessageFromTxes(header *arbostypes.L1IncomingMessageHeader, txes types.Tran l2Message = append(l2Message, arbos.L2MessageKind_EspressoSovereignTx) // Set a block justification placeholder here. That would help us easily parse // our messages from `ParseEspressoMessage`. - jstBytes, err := arbos.GetEspressoJstBytes(&arbostypes.EspressoBlockJustification{ - Header: espressoTypes.GetDummyHeader(), - }) + jstBytes, err := arbos.GetEspressoJstBytes(&arbostypes.EspressoBlockJustification{}) if err != nil { return nil, err } @@ -473,7 +471,7 @@ func (s *ExecutionEngine) SequenceTransactionsEspresso( s.bc, s.bc.Config(), hooks, - &jst.Header, + jst.Header, false, ) if err != nil { From d8edc87b94eb741726d84e7557c27c11c62c1fc3 Mon Sep 17 00:00:00 2001 From: Sneh Koul Date: Tue, 6 Aug 2024 11:13:43 -0400 Subject: [PATCH 05/11] initialize pending transactions if not present --- arbnode/transaction_streamer.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 6fff521e4c..287732ccee 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -143,6 +143,7 @@ func NewTransactionStreamer( if config().SovereignSequencerEnabled { espressoClient := espressoClient.NewClient(config().HotShotUrl) streamer.espressoClient = espressoClient + } err := streamer.cleanupInconsistentState() @@ -1384,12 +1385,18 @@ func (s *TransactionStreamer) setEspressoPendingTxnsPos(batch ethdb.KeyValueWrit func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIndex) error { pendingTxnsPos, err := s.getEspressoPendingTxnsPos() - if err != nil { + if err != nil && !strings.Contains(err.Error(), "not found") { log.Error("failed to get the pending txns", "err", err) return err + } + if strings.Contains(err.Error(), "not found") { + // if the key doesn't exist, create a new array with the pos + pendingTxnsPos = []*arbutil.MessageIndex{&pos} + } else { + pendingTxnsPos = append(pendingTxnsPos, &pos) } - pendingTxnsPos = append(pendingTxnsPos, &pos) + err = s.setEspressoPendingTxnsPos(s.db.NewBatch(), pendingTxnsPos) if err != nil { log.Error("failed to set the pending txns", "err", err) From c4999a2851c9ab350574258aa2aed83bf3b6be19 Mon Sep 17 00:00:00 2001 From: Sneh Koul Date: Tue, 6 Aug 2024 11:16:41 -0400 Subject: [PATCH 06/11] remove unwanted test --- .github/workflows/espresso-e2e.yml | 5 --- .../espresso_sovereign_sequencer_test.go | 37 ------------------- 2 files changed, 42 deletions(-) diff --git a/.github/workflows/espresso-e2e.yml b/.github/workflows/espresso-e2e.yml index 2e03e65198..de8d7afd8c 100644 --- a/.github/workflows/espresso-e2e.yml +++ b/.github/workflows/espresso-e2e.yml @@ -141,8 +141,3 @@ jobs: run: | packages=`go list ./... | grep system_tests` gotestsum --format short-verbose --packages="$packages" --rerun-fails=1 -- -v -timeout 15m -p 1 ./... -run 'TestSovereignSequencer' - - - name: Run sovereign sequencer test with multiple transactions - run: | - packages=`go list ./... | grep system_tests` - gotestsum --format short-verbose --packages="$packages" --rerun-fails=1 -- -v -timeout 15m -p 1 ./... -run 'TestSovereignSequencerMultiTx' \ No newline at end of file diff --git a/system_tests/espresso_sovereign_sequencer_test.go b/system_tests/espresso_sovereign_sequencer_test.go index 8b38b4a26d..205bc92f6b 100644 --- a/system_tests/espresso_sovereign_sequencer_test.go +++ b/system_tests/espresso_sovereign_sequencer_test.go @@ -91,40 +91,3 @@ func TestSovereignSequencer(t *testing.T) { }) Require(t, err) } - -func TestSovereignSequencerSendMultipleTxns(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - valNodeCleanup := createValidationNode(ctx, t, true) - defer valNodeCleanup() - - l2Node, l2Info, cleanup := createL1AndL2Node(ctx, t) - defer cleanup() - - err := waitForL1Node(t, ctx) - Require(t, err) - - cleanEspresso := runEspresso(t, ctx) - defer cleanEspresso() - - // wait for the builder - err = waitForEspressoNode(t, ctx) - Require(t, err) - - err = checkTransferTxOnL2(t, ctx, l2Node, "User14", l2Info) - Require(t, err) - err = checkTransferTxOnL2(t, ctx, l2Node, "User15", l2Info) - Require(t, err) - err = checkTransferTxOnL2(t, ctx, l2Node, "User16", l2Info) - Require(t, err) - - msgCnt, err := l2Node.ConsensusNode.TxStreamer.GetMessageCount() - Require(t, err) - - err = waitForWith(t, ctx, 6*time.Minute, 60*time.Second, func() bool { - validatedCnt := l2Node.ConsensusNode.BlockValidator.Validated(t) - return validatedCnt == msgCnt - }) - Require(t, err) -} From 1f9bc0c6b62191062a896fce6a34d32a344a349b Mon Sep 17 00:00:00 2001 From: Sneh Koul Date: Tue, 6 Aug 2024 15:26:16 -0400 Subject: [PATCH 07/11] resolve bugs --- arbnode/transaction_streamer.go | 62 ++++++++++++++++++++------------- go.mod | 2 +- go.sum | 12 ++----- 3 files changed, 41 insertions(+), 35 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index d9c5970cf7..8b18c214a9 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -9,6 +9,7 @@ import ( "encoding/binary" "encoding/json" "fmt" + tagged_base64 "github.com/EspressoSystems/espresso-sequencer-go/tagged-base64" "math/big" "reflect" "strings" @@ -1016,18 +1017,23 @@ func (s *TransactionStreamer) WriteMessageFromSequencer( MessageWithMeta: msgWithMeta, BlockHash: &msgResult.BlockHash, } - - if err := s.writeMessages(pos, []arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, nil); err != nil { + batch := s.db.NewBatch() + if err := s.writeMessages(pos, []arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, batch); err != nil { return err } s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) s.espressoTxnsStateInsertionMutex.Lock() defer s.espressoTxnsStateInsertionMutex.Unlock() - err = s.SubmitEspressoTransactionPos(pos) + err = s.SubmitEspressoTransactionPos(pos, batch) if err != nil { return err } + err = batch.Write() + if err != nil { + return err + + } return nil } @@ -1212,9 +1218,9 @@ func (s *TransactionStreamer) PollSubmittedTransactionForFinality(ctx context.Co log.Warn("submitted hash not found", "err", err) return s.config().EspressoTxnsPollingInterval } - data, err := s.espressoClient.FetchTransactionByHash(ctx, submittedTxHash) + data, err := s.espressoClient.FetchTransactionByHash(ctx, &submittedTxHash) if err != nil { - log.Error("failed to fetch the submitted transaction hash", "err", err, "pos", submittedTxnPos) + log.Error("failed to fetch the submitted transaction hash", "err", err, "hash", submittedTxHash.String()) return s.config().EspressoTxnsPollingInterval } // get the message at the submitted txn position @@ -1270,7 +1276,7 @@ func (s *TransactionStreamer) PollSubmittedTransactionForFinality(ctx context.Co log.Warn("failed to set the submitted pos to nil", "err", err) return s.config().EspressoTxnsPollingInterval } - err = s.setEspressoSubmittedHash(batch, nil) + err = s.setEspressoSubmittedHash(batch, tagged_base64.TaggedBase64{}) if err != nil { log.Warn("failed to set the submitted hash to nil", "err", err) return s.config().EspressoTxnsPollingInterval @@ -1278,6 +1284,7 @@ func (s *TransactionStreamer) PollSubmittedTransactionForFinality(ctx context.Co err = batch.Write() if err != nil { + log.Error("failed to write to db", "err", err) return s.config().EspressoTxnsPollingInterval } return time.Duration(0) @@ -1300,17 +1307,21 @@ func (s *TransactionStreamer) getEspressoSubmittedPos() (arbutil.MessageIndex, e return arbutil.MessageIndex(pos), nil } -func (s *TransactionStreamer) getEspressoSubmittedHash() (*espressoTypes.TaggedBase64, error) { +func (s *TransactionStreamer) getEspressoSubmittedHash() (espressoTypes.TaggedBase64, error) { posBytes, err := s.db.Get(espressoSubmittedHash) if err != nil { - return nil, err + return espressoTypes.TaggedBase64{}, err } - var hash espressoTypes.TaggedBase64 + var hash string err = rlp.DecodeBytes(posBytes, &hash) if err != nil { - return nil, err + return espressoTypes.TaggedBase64{}, err } - return &hash, nil + hashParsed, err := tagged_base64.Parse(hash) + if hashParsed == nil { + return espressoTypes.TaggedBase64{}, err + } + return espressoTypes.TaggedBase64(*hashParsed), nil } func (s *TransactionStreamer) getEspressoPendingTxnsPos() ([]*arbutil.MessageIndex, error) { @@ -1345,14 +1356,14 @@ func (s *TransactionStreamer) setEspressoSubmittedPos(batch ethdb.KeyValueWriter return nil } -func (s *TransactionStreamer) setEspressoSubmittedHash(batch ethdb.KeyValueWriter, hash *espressoTypes.TaggedBase64) error { +func (s *TransactionStreamer) setEspressoSubmittedHash(batch ethdb.KeyValueWriter, hash espressoTypes.TaggedBase64) error { // if hash is nil, delete the key - if hash == nil { + if hash.Value() == nil { err := batch.Delete(espressoSubmittedHash) return err } - hashBytes, err := rlp.EncodeToBytes(hash) + hashBytes, err := rlp.EncodeToBytes(hash.String()) if err != nil { return err } @@ -1382,22 +1393,20 @@ func (s *TransactionStreamer) setEspressoPendingTxnsPos(batch ethdb.KeyValueWrit return nil } -func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIndex) error { - +func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIndex, batch ethdb.Batch) error { pendingTxnsPos, err := s.getEspressoPendingTxnsPos() if err != nil && !strings.Contains(err.Error(), "not found") { log.Error("failed to get the pending txns", "err", err) return err } - if strings.Contains(err.Error(), "not found") { + if err != nil && strings.Contains(err.Error(), "not found") { // if the key doesn't exist, create a new array with the pos pendingTxnsPos = []*arbutil.MessageIndex{&pos} } else { pendingTxnsPos = append(pendingTxnsPos, &pos) } - - err = s.setEspressoPendingTxnsPos(s.db.NewBatch(), pendingTxnsPos) + err = s.setEspressoPendingTxnsPos(batch, pendingTxnsPos) if err != nil { log.Error("failed to set the pending txns", "err", err) return err @@ -1409,13 +1418,16 @@ func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIn func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ignored struct{}) time.Duration { _, err := s.getEspressoSubmittedPos() - if err != nil { - log.Warn("submitted pos not found", "err", err) + + if err != nil && !strings.Contains(err.Error(), "not found") { + log.Warn("error getting submitted pos", "err", err) return s.config().EspressoTxnsPollingInterval } - if s.PollSubmittedTransactionForFinality(ctx) != time.Duration(0) { - return s.config().EspressoTxnsPollingInterval + if err == nil { + if s.PollSubmittedTransactionForFinality(ctx) != time.Duration(0) { + return s.config().EspressoTxnsPollingInterval + } } pendingTxnsPos, err := s.getEspressoPendingTxnsPos() @@ -1469,13 +1481,15 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig log.Error("failed to set the pending txns", "err", err) return s.config().EspressoTxnsPollingInterval } - err = s.setEspressoSubmittedHash(batch, hash) + err = s.setEspressoSubmittedHash(batch, *hash) if err != nil { log.Error("failed to set the submitted hash", "err", err) return s.config().EspressoTxnsPollingInterval } + err = batch.Write() if err != nil { + log.Error("failed to write to db", "err", err) return s.config().EspressoTxnsPollingInterval } } diff --git a/go.mod b/go.mod index 471845c4ac..6053596447 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ replace github.com/VictoriaMetrics/fastcache => ./fastcache replace github.com/ethereum/go-ethereum => ./go-ethereum require ( - github.com/EspressoSystems/espresso-sequencer-go v0.0.22 + github.com/EspressoSystems/espresso-sequencer-go v0.0.23 github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible github.com/Shopify/toxiproxy v2.1.4+incompatible github.com/alicebob/miniredis/v2 v2.32.1 diff --git a/go.sum b/go.sum index 7fc90570ec..5d7dae46c5 100644 --- a/go.sum +++ b/go.sum @@ -7,16 +7,8 @@ github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53/go.mod h1:+3 github.com/CloudyKit/jet/v3 v3.0.0/go.mod h1:HKQPgSJmdK8hdoAbKUUWajkHyHo4RaU5rMdUywE7VMo= github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= -github.com/EspressoSystems/espresso-sequencer-go v0.0.21 h1:pHLdf8qfIGMNLX3QjoPoYzJ+LUgKf30ipd+Pxcypsaw= -github.com/EspressoSystems/espresso-sequencer-go v0.0.21/go.mod h1:BbU8N23RGl45QXSf/bYc8OQ8TG/vlMaPC1GU1acqKmc= -github.com/EspressoSystems/espresso-sequencer-go v0.0.22-0.20240801025250-0cf3306d9e7d h1:D1HT+1YLHaYgLMbWv+tQLKpqcrxUoZnfxm20dyGJDGI= -github.com/EspressoSystems/espresso-sequencer-go v0.0.22-0.20240801025250-0cf3306d9e7d/go.mod h1:BbU8N23RGl45QXSf/bYc8OQ8TG/vlMaPC1GU1acqKmc= -github.com/EspressoSystems/espresso-sequencer-go v0.0.22-0.20240801030455-3dd69df599ff h1:BfsC+hnZNkKVSfCothZvRZFoyreN1DkzScpjsyDuAO8= -github.com/EspressoSystems/espresso-sequencer-go v0.0.22-0.20240801030455-3dd69df599ff/go.mod h1:BbU8N23RGl45QXSf/bYc8OQ8TG/vlMaPC1GU1acqKmc= -github.com/EspressoSystems/espresso-sequencer-go v0.0.22-0.20240801062726-7dad83d7b5b2 h1:S2dkecHFNPbonPIeoSaZEv40dDYKaW9ukM31EpsGaPs= -github.com/EspressoSystems/espresso-sequencer-go v0.0.22-0.20240801062726-7dad83d7b5b2/go.mod h1:BbU8N23RGl45QXSf/bYc8OQ8TG/vlMaPC1GU1acqKmc= -github.com/EspressoSystems/espresso-sequencer-go v0.0.22 h1:eroPo3SGvRAl7S5hha/bSYJZOTR7lFnDlKX5z4Ave48= -github.com/EspressoSystems/espresso-sequencer-go v0.0.22/go.mod h1:BbU8N23RGl45QXSf/bYc8OQ8TG/vlMaPC1GU1acqKmc= +github.com/EspressoSystems/espresso-sequencer-go v0.0.23 h1:3fvNU1fvoH6eBZFleaUGl6AVcWhd+H3/paKHnYHKfdM= +github.com/EspressoSystems/espresso-sequencer-go v0.0.23/go.mod h1:BbU8N23RGl45QXSf/bYc8OQ8TG/vlMaPC1GU1acqKmc= github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible h1:1G1pk05UrOh0NlF1oeaaix1x8XzrfjIDK47TY0Zehcw= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= From b69c47f64d28dabe5bb1e286a7dd1db84b613f4d Mon Sep 17 00:00:00 2001 From: Sneh Koul Date: Tue, 6 Aug 2024 18:04:01 -0400 Subject: [PATCH 08/11] remove nil pointer panic --- arbnode/batch_poster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 3d03349f74..7535cc3d09 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -503,7 +503,7 @@ func (b *BatchPoster) addEspressoBlockMerkleProof( } if jst.Header == nil { - return fmt.Errorf("this msg has not been included in hotshot %v", jst.Header.Height) + return fmt.Errorf("this msg has not been included in hotshot") } snapshot, err := b.lightClientReader.FetchMerkleRoot(jst.Header.Height, nil) From a048145b6d7588b7d57caf8b7a5857874b9efa38 Mon Sep 17 00:00:00 2001 From: Sneh Koul Date: Tue, 6 Aug 2024 18:09:27 -0400 Subject: [PATCH 09/11] cleanup --- arbnode/transaction_streamer.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 8b18c214a9..5761fa65b3 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -12,7 +12,6 @@ import ( tagged_base64 "github.com/EspressoSystems/espresso-sequencer-go/tagged-base64" "math/big" "reflect" - "strings" "sync" "sync/atomic" "testing" @@ -349,7 +348,7 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde // oldMessage, accumulator stored in tracker, and the message re-read from l1 expectedAcc, err := s.inboxReader.tracker.GetDelayedAcc(delayedSeqNum) if err != nil { - if !strings.Contains(err.Error(), "not found") { + if !isErrNotFound(err) { log.Error("reorg-resequence: failed to read expected accumulator", "err", err) } continue @@ -1395,12 +1394,12 @@ func (s *TransactionStreamer) setEspressoPendingTxnsPos(batch ethdb.KeyValueWrit func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIndex, batch ethdb.Batch) error { pendingTxnsPos, err := s.getEspressoPendingTxnsPos() - if err != nil && !strings.Contains(err.Error(), "not found") { + if err != nil && !isErrNotFound(err) { log.Error("failed to get the pending txns", "err", err) return err } - if err != nil && strings.Contains(err.Error(), "not found") { + if err != nil && isErrNotFound(err) { // if the key doesn't exist, create a new array with the pos pendingTxnsPos = []*arbutil.MessageIndex{&pos} } else { @@ -1419,7 +1418,7 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig _, err := s.getEspressoSubmittedPos() - if err != nil && !strings.Contains(err.Error(), "not found") { + if err != nil && !isErrNotFound(err) { log.Warn("error getting submitted pos", "err", err) return s.config().EspressoTxnsPollingInterval } From 04ca352b2e907061804c8c1575a29b428cd54715 Mon Sep 17 00:00:00 2001 From: Sneh Koul Date: Tue, 6 Aug 2024 19:47:58 -0400 Subject: [PATCH 10/11] revert to strings.contains --- arbnode/transaction_streamer.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 5761fa65b3..3913005484 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -12,6 +12,7 @@ import ( tagged_base64 "github.com/EspressoSystems/espresso-sequencer-go/tagged-base64" "math/big" "reflect" + "strings" "sync" "sync/atomic" "testing" @@ -348,7 +349,7 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde // oldMessage, accumulator stored in tracker, and the message re-read from l1 expectedAcc, err := s.inboxReader.tracker.GetDelayedAcc(delayedSeqNum) if err != nil { - if !isErrNotFound(err) { + if !strings.Contains(err.Error(), "not found") { log.Error("reorg-resequence: failed to read expected accumulator", "err", err) } continue @@ -476,7 +477,7 @@ func (s *TransactionStreamer) getMessageWithMetadataAndBlockHash(seqNum arbutil. return nil, err } blockHash = blockHashDBVal.BlockHash - } else if !isErrNotFound(err) { + } else if !strings.Contains(err.Error(), "not found") { return nil, err } @@ -622,7 +623,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe if broadcastStartPos > 0 { _, err := s.GetMessage(broadcastStartPos - 1) if err != nil { - if !isErrNotFound(err) { + if !strings.Contains(err.Error(), "not found") { return err } // Message before current message doesn't exist in database, so don't add current messages yet @@ -1394,12 +1395,12 @@ func (s *TransactionStreamer) setEspressoPendingTxnsPos(batch ethdb.KeyValueWrit func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIndex, batch ethdb.Batch) error { pendingTxnsPos, err := s.getEspressoPendingTxnsPos() - if err != nil && !isErrNotFound(err) { + if err != nil && !strings.Contains(err.Error(), "not found") { log.Error("failed to get the pending txns", "err", err) return err } - if err != nil && isErrNotFound(err) { + if err != nil && strings.Contains(err.Error(), "not found") { // if the key doesn't exist, create a new array with the pos pendingTxnsPos = []*arbutil.MessageIndex{&pos} } else { @@ -1418,7 +1419,7 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig _, err := s.getEspressoSubmittedPos() - if err != nil && !isErrNotFound(err) { + if err != nil && !strings.Contains(err.Error(), "not found") { log.Warn("error getting submitted pos", "err", err) return s.config().EspressoTxnsPollingInterval } From 3da94ffebe9efc2c0e7844e2be9af7c985675e5e Mon Sep 17 00:00:00 2001 From: Sneh Koul Date: Tue, 6 Aug 2024 19:49:48 -0400 Subject: [PATCH 11/11] fix isErrNotFound --- arbnode/transaction_streamer.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 3913005484..1630fbbf87 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -10,9 +10,9 @@ import ( "encoding/json" "fmt" tagged_base64 "github.com/EspressoSystems/espresso-sequencer-go/tagged-base64" + "github.com/ethereum/go-ethereum/ethdb/memorydb" "math/big" "reflect" - "strings" "sync" "sync/atomic" "testing" @@ -349,7 +349,7 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde // oldMessage, accumulator stored in tracker, and the message re-read from l1 expectedAcc, err := s.inboxReader.tracker.GetDelayedAcc(delayedSeqNum) if err != nil { - if !strings.Contains(err.Error(), "not found") { + if !isErrNotFound(err) { log.Error("reorg-resequence: failed to read expected accumulator", "err", err) } continue @@ -439,7 +439,7 @@ func dbKey(prefix []byte, pos uint64) []byte { } func isErrNotFound(err error) bool { - return errors.Is(err, leveldb.ErrNotFound) || errors.Is(err, pebble.ErrNotFound) + return errors.Is(err, leveldb.ErrNotFound) || errors.Is(err, pebble.ErrNotFound) || errors.Is(err, memorydb.ErrMemorydbNotFound) } // Note: if changed to acquire the mutex, some internal users may need to be updated to a non-locking version. @@ -477,7 +477,7 @@ func (s *TransactionStreamer) getMessageWithMetadataAndBlockHash(seqNum arbutil. return nil, err } blockHash = blockHashDBVal.BlockHash - } else if !strings.Contains(err.Error(), "not found") { + } else if !isErrNotFound(err) { return nil, err } @@ -623,7 +623,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe if broadcastStartPos > 0 { _, err := s.GetMessage(broadcastStartPos - 1) if err != nil { - if !strings.Contains(err.Error(), "not found") { + if !isErrNotFound(err) { return err } // Message before current message doesn't exist in database, so don't add current messages yet @@ -1395,12 +1395,12 @@ func (s *TransactionStreamer) setEspressoPendingTxnsPos(batch ethdb.KeyValueWrit func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIndex, batch ethdb.Batch) error { pendingTxnsPos, err := s.getEspressoPendingTxnsPos() - if err != nil && !strings.Contains(err.Error(), "not found") { + if err != nil && !isErrNotFound(err) { log.Error("failed to get the pending txns", "err", err) return err } - if err != nil && strings.Contains(err.Error(), "not found") { + if err != nil && isErrNotFound(err) { // if the key doesn't exist, create a new array with the pos pendingTxnsPos = []*arbutil.MessageIndex{&pos} } else { @@ -1419,7 +1419,7 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig _, err := s.getEspressoSubmittedPos() - if err != nil && !strings.Contains(err.Error(), "not found") { + if err != nil && !isErrNotFound(err) { log.Warn("error getting submitted pos", "err", err) return s.config().EspressoTxnsPollingInterval }