Skip to content

Commit

Permalink
Merge branch 'master' into design-approved-check
Browse files Browse the repository at this point in the history
  • Loading branch information
tsahee authored Jun 4, 2024
2 parents 453d76c + 1db63cc commit c6ca1ef
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 26 deletions.
2 changes: 1 addition & 1 deletion arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil)
tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig)
Require(t, err)

err = streamer.Start(ctx)
Expand Down
3 changes: 3 additions & 0 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ func (r *InboxReader) Start(ctxIn context.Context) error {
return err
}
if batchCount > 0 {
if r.tracker.snapSyncConfig.Enabled {
break
}
// Validate the init message matches our L2 blockchain
message, err := r.tracker.GetDelayedMessage(0)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (*
}
execEngine.Initialize(gethexec.DefaultCachingConfig.StylusLRUCache)
execSeq := &execClientWrapper{execEngine, t}
inbox, err := NewTransactionStreamer(arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher)
inbox, err := NewTransactionStreamer(arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher, &DefaultSnapSyncConfig)
if err != nil {
Fail(t, err)
}
Expand Down
88 changes: 70 additions & 18 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,24 @@ var (
)

type InboxTracker struct {
db ethdb.Database
txStreamer *TransactionStreamer
mutex sync.Mutex
validator *staker.BlockValidator
dapReaders []daprovider.Reader
db ethdb.Database
txStreamer *TransactionStreamer
mutex sync.Mutex
validator *staker.BlockValidator
dapReaders []daprovider.Reader
snapSyncConfig SnapSyncConfig

batchMetaMutex sync.Mutex
batchMeta *containers.LruCache[uint64, BatchMetadata]
}

func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, dapReaders []daprovider.Reader) (*InboxTracker, error) {
func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, dapReaders []daprovider.Reader, snapSyncConfig SnapSyncConfig) (*InboxTracker, error) {
tracker := &InboxTracker{
db: db,
txStreamer: txStreamer,
dapReaders: dapReaders,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
db: db,
txStreamer: txStreamer,
dapReaders: dapReaders,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
snapSyncConfig: snapSyncConfig,
}
return tracker, nil
}
Expand Down Expand Up @@ -385,16 +387,40 @@ func (t *InboxTracker) GetDelayedMessageBytes(seqNum uint64) ([]byte, error) {
}

func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardReorg bool) error {
var nextAcc common.Hash
firstDelayedMsgToKeep := uint64(0)
if len(messages) == 0 {
return nil
}
t.mutex.Lock()
defer t.mutex.Unlock()

pos, err := messages[0].Message.Header.SeqNum()
if err != nil {
return err
}
if t.snapSyncConfig.Enabled && pos < t.snapSyncConfig.DelayedCount {
firstDelayedMsgToKeep = t.snapSyncConfig.DelayedCount
if firstDelayedMsgToKeep > 0 {
firstDelayedMsgToKeep--
}
for {
if len(messages) == 0 {
return nil
}
pos, err = messages[0].Message.Header.SeqNum()
if err != nil {
return err
}
if pos+1 == firstDelayedMsgToKeep {
nextAcc = messages[0].AfterInboxAcc()
}
if pos < firstDelayedMsgToKeep {
messages = messages[1:]
} else {
break
}
}
}
t.mutex.Lock()
defer t.mutex.Unlock()

if !hardReorg {
// This math is safe to do as we know len(messages) > 0
Expand All @@ -409,8 +435,7 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR
}
}

var nextAcc common.Hash
if pos > 0 {
if pos > firstDelayedMsgToKeep {
var err error
nextAcc, err = t.GetDelayedAcc(pos - 1)
if err != nil {
Expand Down Expand Up @@ -598,17 +623,44 @@ func (b *multiplexerBackend) ReadDelayedInbox(seqNum uint64) (*arbostypes.L1Inco
var delayedMessagesMismatch = errors.New("sequencer batch delayed messages missing or different")

func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L1Interface, batches []*SequencerInboxBatch) error {
var nextAcc common.Hash
var prevbatchmeta BatchMetadata
sequenceNumberToKeep := uint64(0)
if len(batches) == 0 {
return nil
}
if t.snapSyncConfig.Enabled && batches[0].SequenceNumber < t.snapSyncConfig.BatchCount {
sequenceNumberToKeep = t.snapSyncConfig.BatchCount
if sequenceNumberToKeep > 0 {
sequenceNumberToKeep--
}
for {
if len(batches) == 0 {
return nil
}
if batches[0].SequenceNumber+1 == sequenceNumberToKeep {
nextAcc = batches[0].AfterInboxAcc
prevbatchmeta = BatchMetadata{
Accumulator: batches[0].AfterInboxAcc,
DelayedMessageCount: batches[0].AfterDelayedCount,
MessageCount: arbutil.MessageIndex(t.snapSyncConfig.PrevBatchMessageCount),
ParentChainBlock: batches[0].ParentChainBlockNumber,
}
}
if batches[0].SequenceNumber < sequenceNumberToKeep {
batches = batches[1:]
} else {
break
}
}
}
t.mutex.Lock()
defer t.mutex.Unlock()

pos := batches[0].SequenceNumber
startPos := pos
var nextAcc common.Hash
var prevbatchmeta BatchMetadata
if pos > 0 {

if pos > sequenceNumberToKeep {
var err error
prevbatchmeta, err = t.GetBatchMetadata(pos - 1)
nextAcc = prevbatchmeta.Accumulator
Expand Down
23 changes: 21 additions & 2 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type Config struct {
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"`
ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"`
// SnapSyncConfig is only used for testing purposes, these should not be configured in production.
SnapSyncTest SnapSyncConfig
}

func (c *Config) Validate() error {
Expand Down Expand Up @@ -175,6 +177,7 @@ var ConfigDefault = Config{
TransactionStreamer: DefaultTransactionStreamerConfig,
ResourceMgmt: resourcemanager.DefaultConfig,
Maintenance: DefaultMaintenanceConfig,
SnapSyncTest: DefaultSnapSyncConfig,
}

func ConfigDefaultL1Test() *Config {
Expand Down Expand Up @@ -273,6 +276,22 @@ type Node struct {
ctx context.Context
}

type SnapSyncConfig struct {
Enabled bool
PrevBatchMessageCount uint64
PrevDelayedRead uint64
BatchCount uint64
DelayedCount uint64
}

var DefaultSnapSyncConfig = SnapSyncConfig{
Enabled: false,
PrevBatchMessageCount: 0,
BatchCount: 0,
DelayedCount: 0,
PrevDelayedRead: 0,
}

type ConfigFetcher interface {
Get() *Config
Start(context.Context)
Expand Down Expand Up @@ -410,7 +429,7 @@ func createNodeImpl(
}

transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &configFetcher.Get().TransactionStreamer }
txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher)
txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher, &configFetcher.Get().SnapSyncTest)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -540,7 +559,7 @@ func createNodeImpl(
if blobReader != nil {
dapReaders = append(dapReaders, daprovider.NewReaderForBlobReader(blobReader))
}
inboxTracker, err := NewInboxTracker(arbDb, txStreamer, dapReaders)
inboxTracker, err := NewInboxTracker(arbDb, txStreamer, dapReaders, config.SnapSyncTest)
if err != nil {
return nil, err
}
Expand Down
12 changes: 9 additions & 3 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ type TransactionStreamer struct {
execLastMsgCount arbutil.MessageIndex
validator *staker.BlockValidator

db ethdb.Database
fatalErrChan chan<- error
config TransactionStreamerConfigFetcher
db ethdb.Database
fatalErrChan chan<- error
config TransactionStreamerConfigFetcher
snapSyncConfig *SnapSyncConfig

insertionMutex sync.Mutex // cannot be acquired while reorgMutex is held
reorgMutex sync.RWMutex
Expand Down Expand Up @@ -106,6 +107,7 @@ func NewTransactionStreamer(
broadcastServer *broadcaster.Broadcaster,
fatalErrChan chan<- error,
config TransactionStreamerConfigFetcher,
snapSyncConfig *SnapSyncConfig,
) (*TransactionStreamer, error) {
streamer := &TransactionStreamer{
exec: exec,
Expand All @@ -115,6 +117,7 @@ func NewTransactionStreamer(
broadcastServer: broadcastServer,
fatalErrChan: fatalErrChan,
config: config,
snapSyncConfig: snapSyncConfig,
cachedL1PriceData: &L1PriceData{
msgToL1PriceData: []L1PriceDataOfMsg{},
},
Expand Down Expand Up @@ -793,6 +796,9 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, m
}

func (s *TransactionStreamer) getPrevPrevDelayedRead(pos arbutil.MessageIndex) (uint64, error) {
if s.snapSyncConfig.Enabled && uint64(pos) == s.snapSyncConfig.PrevBatchMessageCount {
return s.snapSyncConfig.PrevDelayedRead, nil
}
var prevDelayedRead uint64
if pos > 0 {
prevMsg, err := s.GetMessage(pos - 1)
Expand Down
2 changes: 1 addition & 1 deletion cmd/pruning/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node
return nil, fmt.Errorf("failed to get finalized block: %w", err)
}
l1BlockNum := l1Block.NumberU64()
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil)
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, arbnode.DefaultSnapSyncConfig)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ func (c *SequencerConfig) Validate() error {
if c.expectedSurplusSoftThreshold < c.expectedSurplusHardThreshold {
return errors.New("expected-surplus-soft-threshold cannot be lower than expected-surplus-hard-threshold")
}
if c.MaxTxDataSize > arbostypes.MaxL2MessageSize-50000 {
return errors.New("max-tx-data-size too large for MaxL2MessageSize")
}
return nil
}

Expand Down
Loading

0 comments on commit c6ca1ef

Please sign in to comment.