Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent storage for sovereign builder #180

Merged
merged 12 commits into from
Aug 7, 2024
6 changes: 6 additions & 0 deletions .github/workflows/espresso-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ jobs:
if: steps.cache-cbrotli.outputs.cache-hit != 'true'
run: ./scripts/build-brotli.sh -w -d


- name: Build
run: make build build-replay-env -j

Expand All @@ -140,3 +141,8 @@ jobs:
run: |
packages=`go list ./... | grep system_tests`
gotestsum --format short-verbose --packages="$packages" --rerun-fails=1 -- -v -timeout 15m -p 1 ./... -run 'TestSovereignSequencer'

- name: Run sovereign sequencer test with multiple transactions
run: |
packages=`go list ./... | grep system_tests`
gotestsum --format short-verbose --packages="$packages" --rerun-fails=1 -- -v -timeout 15m -p 1 ./... -run 'TestSovereignSequencerMultiTx'
11 changes: 7 additions & 4 deletions arbnode/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ var (
sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata
delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count

messageCountKey []byte = []byte("_messageCount") // contains the current message count
delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count
sequencerBatchCountKey []byte = []byte("_sequencerBatchCount") // contains the current sequencer message count
dbSchemaVersion []byte = []byte("_schemaVersion") // contains a uint64 representing the database schema version
messageCountKey []byte = []byte("_messageCount") // contains the current message count
delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count
sequencerBatchCountKey []byte = []byte("_sequencerBatchCount") // contains the current sequencer message count
dbSchemaVersion []byte = []byte("_schemaVersion") // contains a uint64 representing the database schema version
espressoSubmittedPos []byte = []byte("_espressoSubmittedPos") // contains the current message index of the last submitted txn
espressoSubmittedHash []byte = []byte("_espressoSubmittedHash") // contains the hash of the last submitted txn
espressoPendingTxnsPositions []byte = []byte("_espressoPendingTxnsPositions") // contains the index of the pending txns that need to be submitted to espresso
)

const currentDbSchemaVersion uint64 = 1
237 changes: 203 additions & 34 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ type TransactionStreamer struct {
config TransactionStreamerConfigFetcher
snapSyncConfig *SnapSyncConfig

insertionMutex sync.Mutex // cannot be acquired while reorgMutex is held
reorgMutex sync.RWMutex
insertionMutex sync.Mutex // cannot be acquired while reorgMutex is held
reorgMutex sync.RWMutex
espressoTxnsStateInsertionMutex sync.Mutex

newMessageNotifier chan struct{}
newSovereignTxNotifier chan struct{}
Expand All @@ -75,11 +76,6 @@ type TransactionStreamer struct {
inboxReader *InboxReader
delayedBridge *DelayedBridge
espressoClient *espressoClient.Client

pendingTxnsQueueMutex sync.Mutex // cannot be acquired while reorgMutex is held
pendingTxnsPos []arbutil.MessageIndex
submittedTxnPos *arbutil.MessageIndex
submittedTxHash *espressoTypes.TaggedBase64
}

type TransactionStreamerConfig struct {
Expand Down Expand Up @@ -1025,7 +1021,12 @@ func (s *TransactionStreamer) WriteMessageFromSequencer(
}

s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos)
s.SubmitEspressoTransactionPos(pos)
s.espressoTxnsStateInsertionMutex.Lock()
defer s.espressoTxnsStateInsertionMutex.Unlock()
err = s.SubmitEspressoTransactionPos(pos)
if err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -1199,23 +1200,26 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc
return s.config().ExecuteMessageLoopDelay
}

func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIndex) {
s.pendingTxnsQueueMutex.Lock()
defer s.pendingTxnsQueueMutex.Unlock()
s.pendingTxnsPos = append(s.pendingTxnsPos, pos)
}

func (s *TransactionStreamer) PollSubmittedTransactionForFinality(ctx context.Context) time.Duration {

data, err := s.espressoClient.FetchTransactionByHash(ctx, s.submittedTxHash)
submittedTxnPos, err := s.getEspressoSubmittedPos()
if err != nil {
log.Error("failed to fetch the transaction hash", "err", err, "pos", s.submittedTxnPos)
log.Warn("submitted pos not found", "err", err)
return s.config().EspressoTxnsPollingInterval
}
submittedTxHash, err := s.getEspressoSubmittedHash()
if err != nil {
log.Warn("submitted hash not found", "err", err)
return s.config().EspressoTxnsPollingInterval
}
data, err := s.espressoClient.FetchTransactionByHash(ctx, submittedTxHash)
if err != nil {
log.Error("failed to fetch the submitted transaction hash", "err", err, "pos", submittedTxnPos)
return s.config().EspressoTxnsPollingInterval
}
// get the message at the submitted txn position
msg, err := s.getMessageWithMetadataAndBlockHash(*s.submittedTxnPos)
msg, err := s.getMessageWithMetadataAndBlockHash(submittedTxnPos)
if err != nil {
log.Error("failed to get espresso message", "err", err)
log.Error("failed to get espresso message at submitted txn pos", "err", err)
return s.config().EspressoTxnsPollingInterval
}
// parse the message to get the transaction bytes and the justification
Expand All @@ -1242,38 +1246,180 @@ func (s *TransactionStreamer) PollSubmittedTransactionForFinality(ctx context.Co
jst.Header = espressoHeader
jst.Proof = &resp.Proof
jst.VidCommon = &resp.VidCommon

// create a new message with the header and the txn and the updated block justification
newMsg, err := arbos.MessageFromEspressoSovereignTx(txns[0], jst, msg.MessageWithMeta.Message.Header)
if err != nil {
log.Error("failed to parse espresso message", "err", err)
return s.config().EspressoTxnsPollingInterval
}
msg.MessageWithMeta.Message = &newMsg

s.espressoTxnsStateInsertionMutex.Lock()
defer s.espressoTxnsStateInsertionMutex.Unlock()

batch := s.db.NewBatch()
err = s.writeMessage(*s.submittedTxnPos, *msg, batch)
err = s.writeMessage(submittedTxnPos, *msg, batch)
if err != nil {
log.Warn("failed to write the submitted txn pos to db ", "err", err)
return s.config().EspressoTxnsPollingInterval
}
err = s.setEspressoSubmittedPos(batch, nil)
if err != nil {
log.Warn("failed to set the submitted pos to nil", "err", err)
return s.config().EspressoTxnsPollingInterval
}
err = s.setEspressoSubmittedHash(batch, nil)
if err != nil {
log.Warn("failed to set the submitted hash to nil", "err", err)
return s.config().EspressoTxnsPollingInterval
}

err = batch.Write()
if err != nil {
return s.config().EspressoTxnsPollingInterval
}
s.submittedTxnPos = nil
s.submittedTxHash = nil
return time.Duration(0)
}

func (s *TransactionStreamer) getEspressoSubmittedPos() (arbutil.MessageIndex, error) {

posBytes, err := s.db.Get(espressoSubmittedPos)
if err != nil {
return 0, err
}

var pos arbutil.MessageIndex
err = rlp.DecodeBytes(posBytes, &pos)

if err != nil {
return 0, err
}

return arbutil.MessageIndex(pos), nil
}

func (s *TransactionStreamer) getEspressoSubmittedHash() (*espressoTypes.TaggedBase64, error) {
posBytes, err := s.db.Get(espressoSubmittedHash)
if err != nil {
return nil, err
}
var hash espressoTypes.TaggedBase64
err = rlp.DecodeBytes(posBytes, &hash)
if err != nil {
return nil, err
}
return &hash, nil
}

func (s *TransactionStreamer) getEspressoPendingTxnsPos() ([]*arbutil.MessageIndex, error) {
pendingTxnsBytes, err := s.db.Get(espressoPendingTxnsPositions)
if err != nil {
return nil, err
}
var pendingTxnsPos []*arbutil.MessageIndex
err = rlp.DecodeBytes(pendingTxnsBytes, &pendingTxnsPos)
if err != nil {
return nil, err
}
return pendingTxnsPos, nil
}

func (s *TransactionStreamer) setEspressoSubmittedPos(batch ethdb.KeyValueWriter, pos *arbutil.MessageIndex) error {
// if pos is nil, delete the key
if pos == nil {
err := batch.Delete(espressoSubmittedPos)
return err
}

posBytes, err := rlp.EncodeToBytes(pos)
if err != nil {
return err
}
err = batch.Put(espressoSubmittedPos, posBytes)
if err != nil {
return err
}

return nil
}

func (s *TransactionStreamer) setEspressoSubmittedHash(batch ethdb.KeyValueWriter, hash *espressoTypes.TaggedBase64) error {
// if hash is nil, delete the key
if hash == nil {
err := batch.Delete(espressoSubmittedHash)
return err
}

hashBytes, err := rlp.EncodeToBytes(hash)
if err != nil {
return err
}
err = batch.Put(espressoSubmittedHash, hashBytes)
if err != nil {
return err
}

return nil
}

func (s *TransactionStreamer) setEspressoPendingTxnsPos(batch ethdb.KeyValueWriter, pos []*arbutil.MessageIndex) error {
if pos == nil {
err := batch.Delete(espressoPendingTxnsPositions)
return err
}

posBytes, err := rlp.EncodeToBytes(pos)
if err != nil {
return err
}
err = batch.Put(espressoPendingTxnsPositions, posBytes)
if err != nil {
return err

}
return nil
}

func (s *TransactionStreamer) SubmitEspressoTransactionPos(pos arbutil.MessageIndex) error {

pendingTxnsPos, err := s.getEspressoPendingTxnsPos()
if err != nil {
log.Error("failed to get the pending txns", "err", err)
return err

}
pendingTxnsPos = append(pendingTxnsPos, &pos)
err = s.setEspressoPendingTxnsPos(s.db.NewBatch(), pendingTxnsPos)
if err != nil {
log.Error("failed to set the pending txns", "err", err)
return err
}

return nil
}

func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ignored struct{}) time.Duration {
if s.submittedTxnPos != nil {
if s.PollSubmittedTransactionForFinality(ctx) != time.Duration(0) {
return s.config().EspressoTxnsPollingInterval
}

_, err := s.getEspressoSubmittedPos()
if err != nil {
log.Warn("submitted pos not found", "err", err)
return s.config().EspressoTxnsPollingInterval
}

if s.PollSubmittedTransactionForFinality(ctx) != time.Duration(0) {
return s.config().EspressoTxnsPollingInterval
}

s.pendingTxnsQueueMutex.Lock()
defer s.pendingTxnsQueueMutex.Unlock()
if s.submittedTxnPos == nil && len(s.pendingTxnsPos) > 0 {
pendingTxnsPos, err := s.getEspressoPendingTxnsPos()
if err != nil {
log.Warn("failed to get pending txns", "err", err)
return s.config().EspressoTxnsPollingInterval
}

if len(pendingTxnsPos) > 0 {
// get the message at the pending txn position
msg, err := s.GetMessage(s.pendingTxnsPos[0])
msg, err := s.GetMessage(*pendingTxnsPos[0])
if err != nil {
log.Error("failed to get espresso submitted pos", "err", err)
return s.config().EspressoTxnsPollingInterval
Expand All @@ -1289,7 +1435,7 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig
Namespace: s.config().EspressoNamespace,
}

log.Info("Submitting transaction to espresso using sovereign sequencer", "tx", espressoTx)
log.Info("submitting transaction to espresso using sovereign sequencer", "tx", espressoTx)

hash, err := s.espressoClient.SubmitTransaction(ctx, espressoTypes.Transaction{
Payload: bytes[0],
Expand All @@ -1300,10 +1446,33 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig
log.Error("failed to submit transaction to espresso", "err", err)
return s.config().EspressoTxnsPollingInterval
}
s.submittedTxnPos = &s.pendingTxnsPos[0]
s.pendingTxnsPos = s.pendingTxnsPos[1:]
s.submittedTxHash = hash

s.espressoTxnsStateInsertionMutex.Lock()
defer s.espressoTxnsStateInsertionMutex.Unlock()

batch := s.db.NewBatch()
err = s.setEspressoSubmittedPos(batch, pendingTxnsPos[0])
if err != nil {
log.Error("failed to set the submitted txn pos", "err", err)
return s.config().EspressoTxnsPollingInterval
}
pendingTxnsPos = pendingTxnsPos[1:]
err = s.setEspressoPendingTxnsPos(batch, pendingTxnsPos)
if err != nil {
log.Error("failed to set the pending txns", "err", err)
return s.config().EspressoTxnsPollingInterval
}
err = s.setEspressoSubmittedHash(batch, hash)
if err != nil {
log.Error("failed to set the submitted hash", "err", err)
return s.config().EspressoTxnsPollingInterval
}
err = batch.Write()
if err != nil {
return s.config().EspressoTxnsPollingInterval
}
}

return s.config().EspressoTxnsPollingInterval
}

Expand Down
37 changes: 37 additions & 0 deletions system_tests/espresso_sovereign_sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,40 @@ func TestSovereignSequencer(t *testing.T) {
})
Require(t, err)
}

func TestSovereignSequencerSendMultipleTxns(t *testing.T) {
Sneh1999 marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

valNodeCleanup := createValidationNode(ctx, t, true)
defer valNodeCleanup()

l2Node, l2Info, cleanup := createL1AndL2Node(ctx, t)
defer cleanup()

err := waitForL1Node(t, ctx)
Require(t, err)

cleanEspresso := runEspresso(t, ctx)
defer cleanEspresso()

// wait for the builder
err = waitForEspressoNode(t, ctx)
Require(t, err)

err = checkTransferTxOnL2(t, ctx, l2Node, "User14", l2Info)
Require(t, err)
err = checkTransferTxOnL2(t, ctx, l2Node, "User15", l2Info)
Require(t, err)
err = checkTransferTxOnL2(t, ctx, l2Node, "User16", l2Info)
Require(t, err)

msgCnt, err := l2Node.ConsensusNode.TxStreamer.GetMessageCount()
Require(t, err)

err = waitForWith(t, ctx, 6*time.Minute, 60*time.Second, func() bool {
validatedCnt := l2Node.ConsensusNode.BlockValidator.Validated(t)
return validatedCnt == msgCnt
})
Require(t, err)
}
Loading