diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 3913005484..1630fbbf87 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -10,9 +10,9 @@ import ( "encoding/json" "fmt" tagged_base64 "github.com/EspressoSystems/espresso-sequencer-go/tagged-base64" + "github.com/ethereum/go-ethereum/ethdb/memorydb" "math/big" "reflect" - "strings" "sync" "sync/atomic" "testing" @@ -349,7 +349,7 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde // oldMessage, accumulator stored in tracker, and the message re-read from l1 expectedAcc, err := s.inboxReader.tracker.GetDelayedAcc(delayedSeqNum) if err != nil { - if !strings.Contains(err.Error(), "not found") { + if !isErrNotFound(err) { log.Error("reorg-resequence: failed to read expected accumulator", "err", err) } continue @@ -439,7 +439,7 @@ func dbKey(prefix []byte, pos uint64) []byte { } func isErrNotFound(err error) bool { - return errors.Is(err, leveldb.ErrNotFound) || errors.Is(err, pebble.ErrNotFound) + return errors.Is(err, leveldb.ErrNotFound) || errors.Is(err, pebble.ErrNotFound) || errors.Is(err, memorydb.ErrMemorydbNotFound) } // Note: if changed to acquire the mutex, some internal users may need to be updated to a non-locking version. @@ -477,7 +477,7 @@ func (s *TransactionStreamer) getMessageWithMetadataAndBlockHash(seqNum arbutil. return nil, err } blockHash = blockHashDBVal.BlockHash - } else if !strings.Contains(err.Error(), "not found") { + } else if !isErrNotFound(err) { return nil, err } @@ -623,7 +623,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe if broadcastStartPos > 0 { _, err := s.GetMessage(broadcastStartPos - 1) if err != nil { - if !strings.Contains(err.Error(), "not found") { + if !isErrNotFound(err) { return err } // Message before current message doesn't exist in database, so don't add current messages yet @@ -1395,12 +1395,12 @@ func (s *TransactionStreamer) setEspressoPendingTxnsPos(batch ethdb.KeyValueWrit func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIndex, batch ethdb.Batch) error { pendingTxnsPos, err := s.getEspressoPendingTxnsPos() - if err != nil && !strings.Contains(err.Error(), "not found") { + if err != nil && !isErrNotFound(err) { log.Error("failed to get the pending txns", "err", err) return err } - if err != nil && strings.Contains(err.Error(), "not found") { + if err != nil && isErrNotFound(err) { // if the key doesn't exist, create a new array with the pos pendingTxnsPos = []*arbutil.MessageIndex{&pos} } else { @@ -1419,7 +1419,7 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig _, err := s.getEspressoSubmittedPos() - if err != nil && !strings.Contains(err.Error(), "not found") { + if err != nil && !isErrNotFound(err) { log.Warn("error getting submitted pos", "err", err) return s.config().EspressoTxnsPollingInterval }