From 02e2be04e5d3e0e415e2119f1fee5202b5490a3c Mon Sep 17 00:00:00 2001 From: amsanghi Date: Mon, 22 Jan 2024 20:25:30 +0530 Subject: [PATCH] Prefetch state needed for future block executions by executing them in parallel against old state --- arbnode/transaction_streamer.go | 10 ++++++- execution/gethexec/executionengine.go | 38 +++++++++++++++++++++++---- execution/gethexec/node.go | 4 +-- execution/gethexec/sequencer.go | 7 +++++ execution/interface.go | 2 +- 5 files changed, 52 insertions(+), 9 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 24ef2a7cc4..5491cbdbf2 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -968,7 +968,15 @@ func (s *TransactionStreamer) executeNextMsg(ctx context.Context, exec execution log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos) return false } - err = s.exec.DigestMessage(pos, msg) + var msgForPrefetch *arbostypes.MessageWithMetadata + if pos+1 < msgCount { + msgForPrefetch, err = s.GetMessage(pos + 1) + if err != nil { + log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos+1) + return false + } + } + err = s.exec.DigestMessage(pos, msg, msgForPrefetch) if err != nil { logger := log.Warn if prevMessageCount < msgCount { diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 58e91a197e..d376c59ba2 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -41,6 +41,8 @@ type ExecutionEngine struct { nextScheduledVersionCheck time.Time // protected by the createBlocksMutex reorgSequencing bool + + prefetchBlock bool } func NewExecutionEngine(bc *core.BlockChain) (*ExecutionEngine, error) { @@ -71,6 +73,16 @@ func (s *ExecutionEngine) EnableReorgSequencing() { s.reorgSequencing = true } +func (s *ExecutionEngine) EnablePrefetchBlock() { + if s.Started() { + panic("trying to enable prefetch block after start") + } + if s.prefetchBlock { + panic("trying to enable prefetch block when already set") + } + s.prefetchBlock = true +} + func (s *ExecutionEngine) SetTransactionStreamer(streamer execution.TransactionStreamer) { if s.Started() { panic("trying to set transaction streamer after start") @@ -107,7 +119,11 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost return err } for i := range newMessages { - err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i]) + var msgForPrefetch *arbostypes.MessageWithMetadata + if i < len(newMessages)-1 { + msgForPrefetch = &newMessages[i] + } + err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i], msgForPrefetch) if err != nil { return err } @@ -486,15 +502,15 @@ func (s *ExecutionEngine) ResultAtPos(pos arbutil.MessageIndex) (*execution.Mess return s.resultFromHeader(s.bc.GetHeaderByNumber(s.MessageIndexToBlockNumber(pos))) } -func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error { +func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error { if !s.createBlocksMutex.TryLock() { return errors.New("createBlock mutex held") } defer s.createBlocksMutex.Unlock() - return s.digestMessageWithBlockMutex(num, msg) + return s.digestMessageWithBlockMutex(num, msg, msgForPrefetch) } -func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error { +func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error { currentHeader, err := s.getCurrentHeader() if err != nil { return err @@ -508,11 +524,23 @@ func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, } startTime := time.Now() + var wg sync.WaitGroup + if s.prefetchBlock && msgForPrefetch != nil { + wg.Add(1) + go func() { + defer wg.Done() + _, _, _, err := s.createBlockFromNextMessage(msgForPrefetch) + if err != nil { + return + } + }() + } + block, statedb, receipts, err := s.createBlockFromNextMessage(msg) if err != nil { return err } - + wg.Wait() err = s.appendBlock(block, statedb, receipts, time.Since(startTime)) if err != nil { return err diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 00337cc355..1ad73febe7 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -311,8 +311,8 @@ func (n *ExecutionNode) StopAndWait() { // } } -func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error { - return n.ExecEngine.DigestMessage(num, msg) +func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error { + return n.ExecEngine.DigestMessage(num, msg, msgForPrefetch) } func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) error { return n.ExecEngine.Reorg(count, newMessages, oldMessages) diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index 5db38cbb4d..9bc6f4378d 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -66,6 +66,7 @@ type SequencerConfig struct { MaxTxDataSize int `koanf:"max-tx-data-size" reload:"hot"` NonceFailureCacheSize int `koanf:"nonce-failure-cache-size" reload:"hot"` NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"` + EnablePrefetchBlock bool `koanf:"enable-prefetch-block"` } func (c *SequencerConfig) Validate() error { @@ -97,6 +98,7 @@ var DefaultSequencerConfig = SequencerConfig{ MaxTxDataSize: 95000, NonceFailureCacheSize: 1024, NonceFailureCacheExpiry: time.Second, + EnablePrefetchBlock: false, } var TestSequencerConfig = SequencerConfig{ @@ -112,6 +114,7 @@ var TestSequencerConfig = SequencerConfig{ MaxTxDataSize: 95000, NonceFailureCacheSize: 1024, NonceFailureCacheExpiry: time.Second, + EnablePrefetchBlock: false, } func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -127,6 +130,7 @@ func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".max-tx-data-size", DefaultSequencerConfig.MaxTxDataSize, "maximum transaction size the sequencer will accept") f.Int(prefix+".nonce-failure-cache-size", DefaultSequencerConfig.NonceFailureCacheSize, "number of transactions with too high of a nonce to keep in memory while waiting for their predecessor") f.Duration(prefix+".nonce-failure-cache-expiry", DefaultSequencerConfig.NonceFailureCacheExpiry, "maximum amount of time to wait for a predecessor before rejecting a tx with nonce too high") + f.Bool(prefix+".enable-prefetch-block", DefaultSequencerConfig.EnablePrefetchBlock, "enable prefetching of blocks") } type txQueueItem struct { @@ -324,6 +328,9 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead } s.Pause() execEngine.EnableReorgSequencing() + if config.EnablePrefetchBlock { + execEngine.EnablePrefetchBlock() + } return s, nil } diff --git a/execution/interface.go b/execution/interface.go index ef9409b9c1..414c31f64e 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -28,7 +28,7 @@ var ErrSequencerInsertLockTaken = errors.New("insert lock taken") // always needed type ExecutionClient interface { - DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error + DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) error HeadMessageNumber() (arbutil.MessageIndex, error) HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error)