Skip to content

Commit

Permalink
Merge branch 'master' into merkle-perf
Browse files Browse the repository at this point in the history
  • Loading branch information
eljobe committed May 24, 2024
2 parents b0bc191 + 5466cfa commit db24c7c
Show file tree
Hide file tree
Showing 19 changed files with 332 additions and 105 deletions.
14 changes: 7 additions & 7 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,31 +851,31 @@ func (p *DataPoster) sendTx(ctx context.Context, prevTx *storage.QueuedTransacti
// different type with a lower nonce.
// If we decide not to send this tx yet, just leave it queued and with Sent set to false.
// The resending/repricing loop in DataPoster.Start will keep trying.
if !newTx.Sent && newTx.FullTx.Nonce() > 0 {
previouslySent := newTx.Sent || (prevTx != nil && prevTx.Sent) // if we've previously sent this nonce
if !previouslySent && newTx.FullTx.Nonce() > 0 {
precedingTx, err := p.queue.Get(ctx, arbmath.SaturatingUSub(newTx.FullTx.Nonce(), 1))
if err != nil {
return fmt.Errorf("couldn't get preceding tx in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err)
}
if precedingTx != nil { // precedingTx == nil -> the actual preceding tx was already confirmed
var latestBlockNumber, prevBlockNumber, reorgResistantNonce uint64
var latestBlockNumber, prevBlockNumber, reorgResistantTxCount uint64
if precedingTx.FullTx.Type() != newTx.FullTx.Type() || !precedingTx.Sent {
latestBlockNumber, err = p.client.BlockNumber(ctx)
if err != nil {
return fmt.Errorf("couldn't get block number in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err)
}
prevBlockNumber = arbmath.SaturatingUSub(latestBlockNumber, 1)
reorgResistantNonce, err = p.client.NonceAt(ctx, p.Sender(), new(big.Int).SetUint64(prevBlockNumber))
reorgResistantTxCount, err = p.client.NonceAt(ctx, p.Sender(), new(big.Int).SetUint64(prevBlockNumber))
if err != nil {
return fmt.Errorf("couldn't determine reorg resistant nonce in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err)
}

if precedingTx.FullTx.Nonce() > reorgResistantNonce {
log.Info("DataPoster is avoiding creating a mempool nonce gap (the tx remains queued and will be retried)", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent)
if newTx.FullTx.Nonce() > reorgResistantTxCount {
log.Info("DataPoster is avoiding creating a mempool nonce gap (the tx remains queued and will be retried)", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent, "latestBlockNumber", latestBlockNumber, "prevBlockNumber", prevBlockNumber, "reorgResistantTxCount", reorgResistantTxCount)
return nil
}
} else {
log.Info("DataPoster will send previously unsent batch tx", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent, "latestBlockNumber", latestBlockNumber, "prevBlockNumber", prevBlockNumber, "reorgResistantNonce", reorgResistantNonce)
}
log.Debug("DataPoster will send previously unsent batch tx", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent, "latestBlockNumber", latestBlockNumber, "prevBlockNumber", prevBlockNumber, "reorgResistantTxCount", reorgResistantTxCount)
}
}

Expand Down
3 changes: 2 additions & 1 deletion arbnode/dataposter/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/offchainlabs/nitro/arbnode/dataposter/redis"
"github.com/offchainlabs/nitro/arbnode/dataposter/slice"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/cmd/conf"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/signature"
Expand All @@ -44,7 +45,7 @@ func newLevelDBStorage(t *testing.T, encF storage.EncoderDecoderF) *dbstorage.St

func newPebbleDBStorage(t *testing.T, encF storage.EncoderDecoderF) *dbstorage.Storage {
t.Helper()
db, err := rawdb.NewPebbleDBDatabase(path.Join(t.TempDir(), "pebble.db"), 0, 0, "default", false, true)
db, err := rawdb.NewPebbleDBDatabase(path.Join(t.TempDir(), "pebble.db"), 0, 0, "default", false, true, conf.PersistentConfigDefault.Pebble.ExtraOptions("pebble"))
if err != nil {
t.Fatalf("NewPebbleDBDatabase() unexpected error: %v", err)
}
Expand Down
90 changes: 55 additions & 35 deletions blocks_reexecutor/blocks_reexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,16 @@ func (c *Config) Validate() error {
if c.EndBlock < c.StartBlock {
return errors.New("invalid block range for blocks re-execution")
}
if c.Room == 0 {
return errors.New("room for blocks re-execution cannot be zero")
if c.Room <= 0 {
return errors.New("room for blocks re-execution should be greater than 0")
}
return nil
}

var DefaultConfig = Config{
Enable: false,
Mode: "random",
Room: runtime.NumCPU(),
BlocksPerThread: 10000,
Enable: false,
Mode: "random",
Room: runtime.NumCPU(),
}

var TestConfig = Config{
Expand All @@ -66,13 +65,14 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {

type BlocksReExecutor struct {
stopwaiter.StopWaiter
config *Config
blockchain *core.BlockChain
stateFor arbitrum.StateForHeaderFunction
done chan struct{}
fatalErrChan chan error
startBlock uint64
currentBlock uint64
config *Config
blockchain *core.BlockChain
stateFor arbitrum.StateForHeaderFunction
done chan struct{}
fatalErrChan chan error
startBlock uint64
currentBlock uint64
blocksPerThread uint64
}

func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *BlocksReExecutor {
Expand All @@ -84,32 +84,47 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block
start = chainStart
end = chainEnd
}
if start < chainStart {
log.Warn("state reexecutor's start block number is lower than genesis, resetting to genesis")
if start < chainStart || start > chainEnd {
log.Warn("invalid state reexecutor's start block number, resetting to genesis", "start", start, "genesis", chainStart)
start = chainStart
}
if end > chainEnd {
log.Warn("state reexecutor's end block number is greater than latest, resetting to latest")
if end > chainEnd || end < chainStart {
log.Warn("invalid state reexecutor's end block number, resetting to latest", "end", end, "latest", chainEnd)
end = chainEnd
}
blocksPerThread := uint64(10000)
if c.BlocksPerThread != 0 {
blocksPerThread = c.BlocksPerThread
}
if c.Mode == "random" && end != start {
if c.BlocksPerThread > end-start {
c.BlocksPerThread = end - start
// Reexecute a range of 10000 or (non-zero) c.BlocksPerThread number of blocks between start to end picked randomly
rng := blocksPerThread
if rng > end-start {
rng = end - start
}
start += uint64(rand.Intn(int(end - start - c.BlocksPerThread + 1)))
end = start + c.BlocksPerThread
start += uint64(rand.Intn(int(end - start - rng + 1)))
end = start + rng
}
// inclusive of block reexecution [start, end]
if start > 0 {
// Inclusive of block reexecution [start, end]
// Do not reexecute genesis block i,e chainStart
if start > 0 && start != chainStart {
start--
}
// Divide work equally among available threads when BlocksPerThread is zero
if c.BlocksPerThread == 0 {
work := (end - start) / uint64(c.Room)
if work > 0 {
blocksPerThread = work
}
}
return &BlocksReExecutor{
config: c,
blockchain: blockchain,
currentBlock: end,
startBlock: start,
done: make(chan struct{}, c.Room),
fatalErrChan: fatalErrChan,
config: c,
blockchain: blockchain,
currentBlock: end,
startBlock: start,
blocksPerThread: blocksPerThread,
done: make(chan struct{}, c.Room),
fatalErrChan: fatalErrChan,
stateFor: func(header *types.Header) (*state.StateDB, arbitrum.StateReleaseFunc, error) {
state, err := blockchain.StateAt(header.Root)
return state, arbitrum.NoopStateRelease, err
Expand All @@ -119,17 +134,17 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block

// LaunchBlocksReExecution launches the thread to apply blocks of range [currentBlock-s.config.BlocksPerThread, currentBlock] to the last available valid state
func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentBlock uint64) uint64 {
start := arbmath.SaturatingUSub(currentBlock, s.config.BlocksPerThread)
start := arbmath.SaturatingUSub(currentBlock, s.blocksPerThread)
if start < s.startBlock {
start = s.startBlock
}
// we don't use state release pattern here
// TODO do we want to use release pattern here?
startState, startHeader, _, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1)
startState, startHeader, release, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1)
if err != nil {
s.fatalErrChan <- fmt.Errorf("blocksReExecutor failed to get last available state while searching for state at %d, err: %w", start, err)
return s.startBlock
}
// NoOp
defer release()
start = startHeader.Number.Uint64()
s.LaunchThread(func(ctx context.Context) {
_, err := arbitrum.AdvanceStateUpToBlock(ctx, s.blockchain, startState, s.blockchain.GetHeaderByNumber(currentBlock), startHeader, nil)
Expand Down Expand Up @@ -169,9 +184,14 @@ func (s *BlocksReExecutor) Impl(ctx context.Context) {
log.Info("BlocksReExecutor successfully completed re-execution of blocks against historic state", "stateAt", s.startBlock, "startBlock", s.startBlock+1, "endBlock", end)
}

func (s *BlocksReExecutor) Start(ctx context.Context) {
func (s *BlocksReExecutor) Start(ctx context.Context, done chan struct{}) {
s.StopWaiter.Start(ctx, s)
s.LaunchThread(s.Impl)
s.LaunchThread(func(ctx context.Context) {
s.Impl(ctx)
if done != nil {
close(done)
}
})
}

func (s *BlocksReExecutor) StopAndWait() {
Expand Down
Loading

0 comments on commit db24c7c

Please sign in to comment.