Skip to content

Commit

Permalink
Merge pull request #2765 from OffchainLabs/bulksyncing-missing-blockm…
Browse files Browse the repository at this point in the history
…etadata

Bulk syncing of missing BlockMetadata
  • Loading branch information
ganeshvanahalli authored Dec 30, 2024
2 parents d79ffde + f1f1198 commit a9bbd0b
Show file tree
Hide file tree
Showing 11 changed files with 417 additions and 63 deletions.
151 changes: 151 additions & 0 deletions arbnode/blockmetadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package arbnode

import (
"bytes"
"context"
"encoding/binary"
"time"

"github.com/spf13/pflag"

"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/execution/gethexec"
"github.com/offchainlabs/nitro/util"
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

type BlockMetadataFetcherConfig struct {
Enable bool `koanf:"enable"`
Source rpcclient.ClientConfig `koanf:"source" reload:"hot"`
SyncInterval time.Duration `koanf:"sync-interval"`
APIBlocksLimit uint64 `koanf:"api-blocks-limit"`
}

var DefaultBlockMetadataFetcherConfig = BlockMetadataFetcherConfig{
Enable: false,
Source: rpcclient.DefaultClientConfig,
SyncInterval: time.Minute * 5,
APIBlocksLimit: 100,
}

func BlockMetadataFetcherConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBlockMetadataFetcherConfig.Enable, "enable syncing blockMetadata using a bulk blockMetadata api. If the source doesn't have the missing blockMetadata, we keep retyring in every sync-interval (default=5mins) duration")
rpcclient.RPCClientAddOptions(prefix+".source", f, &DefaultBlockMetadataFetcherConfig.Source)
f.Duration(prefix+".sync-interval", DefaultBlockMetadataFetcherConfig.SyncInterval, "interval at which blockMetadata are synced regularly")
f.Uint64(prefix+".api-blocks-limit", DefaultBlockMetadataFetcherConfig.APIBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query.\n"+
"This should be set lesser than or equal to the limit on the api provider side")
}

type BlockMetadataFetcher struct {
stopwaiter.StopWaiter
config BlockMetadataFetcherConfig
db ethdb.Database
client *rpcclient.RpcClient
exec execution.ExecutionClient
}

func NewBlockMetadataFetcher(ctx context.Context, c BlockMetadataFetcherConfig, db ethdb.Database, exec execution.ExecutionClient) (*BlockMetadataFetcher, error) {
client := rpcclient.NewRpcClient(func() *rpcclient.ClientConfig { return &c.Source }, nil)
if err := client.Start(ctx); err != nil {
return nil, err
}
return &BlockMetadataFetcher{
config: c,
db: db,
client: client,
exec: exec,
}, nil
}

func (b *BlockMetadataFetcher) fetch(ctx context.Context, fromBlock, toBlock uint64) ([]gethexec.NumberAndBlockMetadata, error) {
var result []gethexec.NumberAndBlockMetadata
// #nosec G115
err := b.client.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(fromBlock), rpc.BlockNumber(toBlock))
if err != nil {
return nil, err
}
return result, nil
}

func (b *BlockMetadataFetcher) persistBlockMetadata(query []uint64, result []gethexec.NumberAndBlockMetadata) error {
batch := b.db.NewBatch()
queryMap := util.ArrayToSet(query)
for _, elem := range result {
pos, err := b.exec.BlockNumberToMessageIndex(elem.BlockNumber)
if err != nil {
return err
}
if _, ok := queryMap[uint64(pos)]; ok {
if err := batch.Put(dbKey(blockMetadataInputFeedPrefix, uint64(pos)), elem.RawMetadata); err != nil {
return err
}
if err := batch.Delete(dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos))); err != nil {
return err
}
// If we reached the ideal batch size, commit and reset
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return err
}
batch.Reset()
}
}
}
return batch.Write()
}

func (b *BlockMetadataFetcher) Update(ctx context.Context) time.Duration {
handleQuery := func(query []uint64) bool {
result, err := b.fetch(
ctx,
b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[0])),
b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[len(query)-1])),
)
if err != nil {
log.Error("Error getting result from bulk blockMetadata API", "err", err)
return false
}
if err = b.persistBlockMetadata(query, result); err != nil {
log.Error("Error committing result from bulk blockMetadata API to ArbDB", "err", err)
return false
}
return true
}
iter := b.db.NewIterator(missingBlockMetadataInputFeedPrefix, nil)
defer iter.Release()
var query []uint64
for iter.Next() {
keyBytes := bytes.TrimPrefix(iter.Key(), missingBlockMetadataInputFeedPrefix)
query = append(query, binary.BigEndian.Uint64(keyBytes))
end := len(query) - 1
if query[end]-query[0]+1 >= uint64(b.config.APIBlocksLimit) {
if query[end]-query[0]+1 > uint64(b.config.APIBlocksLimit) && len(query) >= 2 {
end -= 1
}
if success := handleQuery(query[:end+1]); !success {
return b.config.SyncInterval
}
query = query[end+1:]
}
}
if len(query) > 0 {
_ = handleQuery(query)
}
return b.config.SyncInterval
}

func (b *BlockMetadataFetcher) Start(ctx context.Context) {
b.StopWaiter.Start(ctx, b)
b.CallIteratively(b.Update)
}

func (b *BlockMetadataFetcher) StopAndWait() {
b.StopWaiter.StopAndWait()
b.client.Close()
}
94 changes: 59 additions & 35 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,24 @@ func GenerateRollupConfig(prod bool, wasmModuleRoot common.Hash, rollupOwner com
}

type Config struct {
Sequencer bool `koanf:"sequencer"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"`
DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"`
BatchPoster BatchPosterConfig `koanf:"batch-poster" reload:"hot"`
MessagePruner MessagePrunerConfig `koanf:"message-pruner" reload:"hot"`
BlockValidator staker.BlockValidatorConfig `koanf:"block-validator" reload:"hot"`
Feed broadcastclient.FeedConfig `koanf:"feed" reload:"hot"`
Staker legacystaker.L1ValidatorConfig `koanf:"staker" reload:"hot"`
Bold boldstaker.BoldConfig `koanf:"bold"`
SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"`
DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
Dangerous DangerousConfig `koanf:"dangerous"`
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"`
ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"`
Sequencer bool `koanf:"sequencer"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"`
DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"`
BatchPoster BatchPosterConfig `koanf:"batch-poster" reload:"hot"`
MessagePruner MessagePrunerConfig `koanf:"message-pruner" reload:"hot"`
BlockValidator staker.BlockValidatorConfig `koanf:"block-validator" reload:"hot"`
Feed broadcastclient.FeedConfig `koanf:"feed" reload:"hot"`
Staker legacystaker.L1ValidatorConfig `koanf:"staker" reload:"hot"`
Bold boldstaker.BoldConfig `koanf:"bold"`
SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"`
DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
Dangerous DangerousConfig `koanf:"dangerous"`
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"`
ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"`
BlockMetadataFetcher BlockMetadataFetcherConfig `koanf:"block-metadata-fetcher" reload:"hot"`
// SnapSyncConfig is only used for testing purposes, these should not be configured in production.
SnapSyncTest SnapSyncConfig
}
Expand Down Expand Up @@ -135,6 +136,12 @@ func (c *Config) Validate() error {
if err := c.Staker.Validate(); err != nil {
return err
}
if c.Sequencer && c.TransactionStreamer.TrackBlockMetadataFrom == 0 {
return errors.New("when sequencer is enabled track-block-metadata-from should be set as well")
}
if c.TransactionStreamer.TrackBlockMetadataFrom != 0 && !c.BlockMetadataFetcher.Enable {
log.Warn("track-block-metadata-from is set but blockMetadata fetcher is not enabled")
}
return nil
}

Expand Down Expand Up @@ -165,27 +172,29 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed
DangerousConfigAddOptions(prefix+".dangerous", f)
TransactionStreamerConfigAddOptions(prefix+".transaction-streamer", f)
MaintenanceConfigAddOptions(prefix+".maintenance", f)
BlockMetadataFetcherConfigAddOptions(prefix+".block-metadata-fetcher", f)
}

var ConfigDefault = Config{
Sequencer: false,
ParentChainReader: headerreader.DefaultConfig,
InboxReader: DefaultInboxReaderConfig,
DelayedSequencer: DefaultDelayedSequencerConfig,
BatchPoster: DefaultBatchPosterConfig,
MessagePruner: DefaultMessagePrunerConfig,
BlockValidator: staker.DefaultBlockValidatorConfig,
Feed: broadcastclient.FeedConfigDefault,
Staker: legacystaker.DefaultL1ValidatorConfig,
Bold: boldstaker.DefaultBoldConfig,
SeqCoordinator: DefaultSeqCoordinatorConfig,
DataAvailability: das.DefaultDataAvailabilityConfig,
SyncMonitor: DefaultSyncMonitorConfig,
Dangerous: DefaultDangerousConfig,
TransactionStreamer: DefaultTransactionStreamerConfig,
ResourceMgmt: resourcemanager.DefaultConfig,
Maintenance: DefaultMaintenanceConfig,
SnapSyncTest: DefaultSnapSyncConfig,
Sequencer: false,
ParentChainReader: headerreader.DefaultConfig,
InboxReader: DefaultInboxReaderConfig,
DelayedSequencer: DefaultDelayedSequencerConfig,
BatchPoster: DefaultBatchPosterConfig,
MessagePruner: DefaultMessagePrunerConfig,
BlockValidator: staker.DefaultBlockValidatorConfig,
Feed: broadcastclient.FeedConfigDefault,
Staker: legacystaker.DefaultL1ValidatorConfig,
Bold: boldstaker.DefaultBoldConfig,
SeqCoordinator: DefaultSeqCoordinatorConfig,
DataAvailability: das.DefaultDataAvailabilityConfig,
SyncMonitor: DefaultSyncMonitorConfig,
Dangerous: DefaultDangerousConfig,
TransactionStreamer: DefaultTransactionStreamerConfig,
ResourceMgmt: resourcemanager.DefaultConfig,
Maintenance: DefaultMaintenanceConfig,
BlockMetadataFetcher: DefaultBlockMetadataFetcherConfig,
SnapSyncTest: DefaultSnapSyncConfig,
}

func ConfigDefaultL1Test() *Config {
Expand All @@ -195,6 +204,7 @@ func ConfigDefaultL1Test() *Config {
config.SeqCoordinator = TestSeqCoordinatorConfig
config.Sequencer = true
config.Dangerous.NoSequencerCoordinator = true
config.TransactionStreamer.TrackBlockMetadataFrom = 1

return config
}
Expand Down Expand Up @@ -280,6 +290,7 @@ type Node struct {
MaintenanceRunner *MaintenanceRunner
DASLifecycleManager *das.LifecycleManager
SyncMonitor *SyncMonitor
blockMetadataFetcher *BlockMetadataFetcher
configFetcher ConfigFetcher
ctx context.Context
}
Expand Down Expand Up @@ -511,6 +522,14 @@ func createNodeImpl(
}
}

var blockMetadataFetcher *BlockMetadataFetcher
if config.BlockMetadataFetcher.Enable {
blockMetadataFetcher, err = NewBlockMetadataFetcher(ctx, config.BlockMetadataFetcher, arbDb, exec)
if err != nil {
return nil, err
}
}

if !config.ParentChainReader.Enable {
return &Node{
ArbDB: arbDb,
Expand All @@ -534,6 +553,7 @@ func createNodeImpl(
MaintenanceRunner: maintenanceRunner,
DASLifecycleManager: nil,
SyncMonitor: syncMonitor,
blockMetadataFetcher: blockMetadataFetcher,
configFetcher: configFetcher,
ctx: ctx,
}, nil
Expand Down Expand Up @@ -767,6 +787,7 @@ func createNodeImpl(
MaintenanceRunner: maintenanceRunner,
DASLifecycleManager: dasLifecycleManager,
SyncMonitor: syncMonitor,
blockMetadataFetcher: blockMetadataFetcher,
configFetcher: configFetcher,
ctx: ctx,
}, nil
Expand Down Expand Up @@ -950,6 +971,9 @@ func (n *Node) Start(ctx context.Context) error {
n.BroadcastClients.Start(ctx)
}()
}
if n.blockMetadataFetcher != nil {
n.blockMetadataFetcher.Start(ctx)
}
if n.configFetcher != nil {
n.configFetcher.Start(ctx)
}
Expand Down
19 changes: 10 additions & 9 deletions arbnode/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
package arbnode

var (
messagePrefix []byte = []byte("m") // maps a message sequence number to a message
blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed
blockMetadataInputFeedPrefix []byte = []byte("t") // maps a message sequence number to a blockMetaData byte array received through the input feed
messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result
legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1
rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message
parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number
sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata
delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count
messagePrefix []byte = []byte("m") // maps a message sequence number to a message
blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed
blockMetadataInputFeedPrefix []byte = []byte("t") // maps a message sequence number to a blockMetaData byte array received through the input feed
missingBlockMetadataInputFeedPrefix []byte = []byte("x") // maps a message sequence number whose blockMetaData byte array is missing to nil
messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result
legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1
rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message
parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number
sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata
delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count

messageCountKey []byte = []byte("_messageCount") // contains the current message count
delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count
Expand Down
Loading

0 comments on commit a9bbd0b

Please sign in to comment.