Skip to content

Commit

Permalink
core/rawdb: optimize write block with sidecars
Browse files Browse the repository at this point in the history
core/rawdb: rollback interface freezeRange and remove freezeRangeWithBlobs
  • Loading branch information
buddh0 committed Mar 22, 2024
1 parent 2313e2c commit 5df5f71
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 165 deletions.
8 changes: 2 additions & 6 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1408,12 +1408,8 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
writeSize int64
err error
)
if !bc.chainConfig.IsCancun(last.Number(), last.Time()) {
log.Info("WriteAncientBlocks", "startAt", blockChain[0].Number(), "last", last.Number())
writeSize, err = rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)
} else {
writeSize, err = rawdb.WriteAncientBlocksAfterCancun(bc.db, bc.chainConfig, blockChain, receiptChain, td)
}
writeSize, err = rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)

if err != nil {
log.Error("Error importing chain data to ancients", "err", err)
return 0, err
Expand Down
11 changes: 10 additions & 1 deletion core/data_availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"crypto/sha256"
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -46,9 +47,17 @@ func validateBlobSidecar(hashes []common.Hash, sidecar *types.BlobSidecar) error

// IsDataAvailable it checks that the blobTx block has available blob data
func IsDataAvailable(chain consensus.ChainHeaderReader, block *types.Block) (err error) {
// refer logic in ValidateBody
if !chain.Config().IsCancun(block.Number(), block.Time()) {
return nil
if block.Sidecars() == nil {
return nil
} else {
return errors.New("sidecars present in block body before cancun")
}
} else if block.Sidecars() == nil {
return errors.New("missing sidecars in block body after cancun")
}

// only required to check within MinBlocksForBlobRequests block's DA
highest := chain.ChasingHead()
current := chain.CurrentHeader()
Expand Down
146 changes: 14 additions & 132 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,121 +800,18 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) {

// WriteAncientBlocks writes entire block data into ancient store and returns the total written size.
func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
var (
tdSum = new(big.Int).Set(td)
stReceipts []*types.ReceiptForStorage
)
return db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i, block := range blocks {
// Convert receipts to storage format and sum up total difficulty.
stReceipts = stReceipts[:0]
for _, receipt := range receipts[i] {
stReceipts = append(stReceipts, (*types.ReceiptForStorage)(receipt))
}
header := block.Header()
if i > 0 {
tdSum.Add(tdSum, header.Difficulty)
}
if err := writeAncientBlock(op, block, header, stReceipts, tdSum); err != nil {
return err
}
}
return nil
})
}

// WriteAncientBlocksAfterCancun writes entire block data into ancient store and returns the total written size.
func WriteAncientBlocksAfterCancun(db ethdb.AncientStore, config *params.ChainConfig, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
if len(blocks) == 0 {
return 0, nil
}

cancunIndex := -1
for i, block := range blocks {
if config.IsCancun(block.Number(), block.Time()) {
if block.Sidecars() != nil {
cancunIndex = i
break
}
}
log.Info("WriteAncientBlocksAfterCancun", "startAt", blocks[0].Number(), "cancunIndex", cancunIndex, "len", len(blocks))
if cancunIndex < 0 {
return WriteAncientBlocks(db, blocks, receipts, td)
}

// if there has pre-cancun and post-cancun blocks, write them separately
var (
tdSum = new(big.Int).Set(td)
stReceipts []*types.ReceiptForStorage
)

// write pre-cancun blocks
preBlocks := blocks[:cancunIndex]
preReceipts := receipts[:cancunIndex]
preSize, err := db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i, block := range preBlocks {
// Convert receipts to storage format and sum up total difficulty.
stReceipts = stReceipts[:0]
for _, receipt := range preReceipts[i] {
stReceipts = append(stReceipts, (*types.ReceiptForStorage)(receipt))
}
header := block.Header()
if i > 0 {
tdSum.Add(tdSum, header.Difficulty)
}
if err := writeAncientBlock(op, block, header, stReceipts, tdSum); err != nil {
return err
}
}
return nil
})
if err != nil {
return preSize, err
}

// write post-cancun blocks
postBlocks := blocks[cancunIndex:]
postReceipts := receipts[cancunIndex:]
// try reset empty blob ancient table
if err := ResetEmptyBlobAncientTable(db, postBlocks[0].NumberU64()); err != nil {
return 0, err
}
postSize, err := db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i, block := range postBlocks {
// Convert receipts to storage format and sum up total difficulty.
stReceipts = stReceipts[:0]
for _, receipt := range postReceipts[i] {
stReceipts = append(stReceipts, (*types.ReceiptForStorage)(receipt))
}
header := block.Header()
if i > 0 {
tdSum.Add(tdSum, header.Difficulty)
}
if err := writeAncientBlockWithSidecar(op, block, header, stReceipts, tdSum, block.Sidecars()); err != nil {
return err
}
log.Info("WriteAncientBlocks", "startAt", blocks[0].Number(), "cancunIndex", cancunIndex, "len", len(blocks))
if cancunIndex >= 0 {
if err := ResetEmptyBlobAncientTable(db, blocks[cancunIndex].NumberU64()); err != nil {
return 0, err
}
return nil
})
return preSize + postSize, err
}

// WriteAncientBlocksWithSidecars writes entire block data into ancient store and returns the total written size.
// Attention: The caller must set blobs after cancun
func WriteAncientBlocksWithSidecars(db ethdb.AncientStore, blocks []*types.Block, receipts []types.Receipts, td *big.Int, sidecars []types.BlobSidecars) (int64, error) {
if len(blocks) == 0 {
return 0, nil
}

// do some sanity check
if len(blocks) != len(sidecars) {
return 0, fmt.Errorf("the sidecars len is different with blocks, %v:%v", len(sidecars), len(blocks))
}
if len(blocks) != len(receipts) {
return 0, fmt.Errorf("the receipts len is different with blocks, %v:%v", len(receipts), len(blocks))
}
// try reset empty blob ancient table
if err := ResetEmptyBlobAncientTable(db, blocks[0].NumberU64()); err != nil {
return 0, err
}

var (
Expand All @@ -932,7 +829,7 @@ func WriteAncientBlocksWithSidecars(db ethdb.AncientStore, blocks []*types.Block
if i > 0 {
tdSum.Add(tdSum, header.Difficulty)
}
if err := writeAncientBlockWithSidecar(op, block, header, stReceipts, tdSum, sidecars[i]); err != nil {
if err := writeAncientBlock(op, block, header, stReceipts, tdSum); err != nil {
return err
}
}
Expand Down Expand Up @@ -983,8 +880,8 @@ func WriteBlobSidecars(db ethdb.KeyValueWriter, hash common.Hash, number uint64,
}
}

// DeleteBlobSidecars removes all blob data associated with a block hash.
func DeleteBlobSidecars(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
// deleteBlobSidecars removes all blob data associated with a block hash.
func deleteBlobSidecars(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Delete(blockBlobSidecarsKey(number, hash)); err != nil {
log.Crit("Failed to delete block blobs", "err", err)
}
Expand All @@ -1007,25 +904,10 @@ func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *type
if err := op.Append(ChainFreezerDifficultyTable, num, td); err != nil {
return fmt.Errorf("can't append block %d total difficulty: %v", num, err)
}
return nil
}

func writeAncientSidecar(op ethdb.AncientWriteOp, num uint64, blobs types.BlobSidecars) error {
if err := op.Append(ChainFreezerBlobSidecarTable, num, blobs); err != nil {
return fmt.Errorf("can't append block %d blobs: %v", num, err)
}
return nil
}

// writeAncientBlockWithSidecar writes entire block data into ancient store and returns the total written size.
// Attention: The caller must set blobs after cancun
func writeAncientBlockWithSidecar(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts []*types.ReceiptForStorage, td *big.Int, sidecars types.BlobSidecars) error {
num := block.NumberU64()
if err := writeAncientBlock(op, block, header, receipts, td); err != nil {
return err
}
if err := writeAncientSidecar(op, num, sidecars); err != nil {
return err
if len(block.Sidecars()) > 0 {
if err := op.Append(ChainFreezerBlobSidecarTable, num, block.Sidecars()); err != nil {
return fmt.Errorf("can't append block %d blobs: %v", num, err)
}
}
return nil
}
Expand All @@ -1036,7 +918,7 @@ func DeleteBlock(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
DeleteHeader(db, hash, number)
DeleteBody(db, hash, number)
DeleteTd(db, hash, number)
DeleteBlobSidecars(db, hash, number) // it is safe to delete non-exist blob
deleteBlobSidecars(db, hash, number) // it is safe to delete non-exist blob
}

// DeleteBlockWithoutNumber removes all block data associated with a hash, except
Expand All @@ -1046,7 +928,7 @@ func DeleteBlockWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number
deleteHeaderWithoutNumber(db, hash, number)
DeleteBody(db, hash, number)
DeleteTd(db, hash, number)
DeleteBlobSidecars(db, hash, number)
deleteBlobSidecars(db, hash, number)
}

const badBlockToKeep = 10
Expand Down
8 changes: 5 additions & 3 deletions core/rawdb/accessors_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func TestBlockBlobSidecarsStorage(t *testing.T) {
}
}

DeleteBlobSidecars(db, blkHash, 0)
deleteBlobSidecars(db, blkHash, 0)
if bs := ReadRawBlobSidecars(db, blkHash, 0); len(bs) != 0 {
t.Fatalf("deleted sidecars returned: %v", bs)
}
Expand Down Expand Up @@ -686,8 +686,10 @@ func BenchmarkWriteAncientBlocks(b *testing.B) {

blocks := allBlocks[i : i+length]
receipts := batchReceipts[:length]
sidecars := batchSidecars[:length]
writeSize, err := WriteAncientBlocksWithSidecars(db, blocks, receipts, td, sidecars)
for j := 0; j < length; j++ {
blocks[i+j].WithSidecars(batchSidecars[j])
}
writeSize, err := WriteAncientBlocks(db, blocks, receipts, td)
if err != nil {
b.Fatal(err)
}
Expand Down
23 changes: 13 additions & 10 deletions core/rawdb/chain_freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,8 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
if limit-first > freezerBatchLimit {
limit = first + freezerBatchLimit
}
var env *ethdb.FreezerEnv
env, _ = f.freezeEnv.Load().(*ethdb.FreezerEnv)
ancients, err := f.freezeRangeWithBlobs(nfdb, first, limit, env)

ancients, err := f.freezeRangeWithBlobs(nfdb, first, limit)
if err != nil {
log.Error("Error in block freeze operation", "err", err)
backoff = true
Expand Down Expand Up @@ -250,6 +249,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
}
log.Debug("Deep froze chain segment", context...)

env, _ := f.freezeEnv.Load().(*ethdb.FreezerEnv)
// try prune blob data after cancun fork
if isCancun(env, head.Number, head.Time) {
f.tryPruneBlobAncient(env, *number)
Expand Down Expand Up @@ -293,7 +293,7 @@ func getBlobExtraReserveFromEnv(env *ethdb.FreezerEnv) uint64 {
return env.BlobExtraReserve
}

func (f *chainFreezer) freezeRangeWithBlobs(nfdb *nofreezedb, number, limit uint64, env *ethdb.FreezerEnv) (hashes []common.Hash, err error) {
func (f *chainFreezer) freezeRangeWithBlobs(nfdb *nofreezedb, number, limit uint64) (hashes []common.Hash, err error) {
defer func() {
log.Info("freezeRangeWithBlobs", "from", number, "to", limit, "err", err)
}()
Expand All @@ -305,8 +305,9 @@ func (f *chainFreezer) freezeRangeWithBlobs(nfdb *nofreezedb, number, limit uint
if last == nil {
return nil, fmt.Errorf("block header missing, can't freeze block %d", limit)
}
env, _ := f.freezeEnv.Load().(*ethdb.FreezerEnv)
if !isCancun(env, last.Number, last.Time) {
return f.freezeRange(nfdb, number, limit, env)
return f.freezeRange(nfdb, number, limit)
}

var (
Expand All @@ -330,11 +331,11 @@ func (f *chainFreezer) freezeRangeWithBlobs(nfdb *nofreezedb, number, limit uint
}
}
if !found {
return f.freezeRange(nfdb, number, limit, env)
return f.freezeRange(nfdb, number, limit)
}

// freeze pre cancun
preHashes, err := f.freezeRange(nfdb, number, cancunNumber-1, env)
preHashes, err := f.freezeRange(nfdb, number, cancunNumber-1)
if err != nil {
return preHashes, err
}
Expand All @@ -343,15 +344,17 @@ func (f *chainFreezer) freezeRangeWithBlobs(nfdb *nofreezedb, number, limit uint
return preHashes, err
}
// freeze post cancun
postHashes, err := f.freezeRange(nfdb, cancunNumber, limit, env)
postHashes, err := f.freezeRange(nfdb, cancunNumber, limit)
hashes = append(preHashes, postHashes...)
return hashes, err
}

func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64, env *ethdb.FreezerEnv) (hashes []common.Hash, err error) {
func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hashes []common.Hash, err error) {
if number > limit {
return nil, nil
}

env, _ := f.freezeEnv.Load().(*ethdb.FreezerEnv)
hashes = make([]common.Hash, 0, limit-number)
_, err = f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for ; number <= limit; number++ {
Expand Down Expand Up @@ -437,6 +440,6 @@ func isCancun(env *ethdb.FreezerEnv, num *big.Int, time uint64) bool {
return env.ChainCfg.IsCancun(num, time)
}

func ResetEmptyBlobAncientTable(db ethdb.AncientStore, next uint64) error {
func ResetEmptyBlobAncientTable(db ethdb.AncientWriter, next uint64) error {
return db.ResetTable(ChainFreezerBlobSidecarTable, next, next, true)
}
17 changes: 5 additions & 12 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,18 +455,11 @@ func (p *BlockPruner) backUpOldDb(name string, cache, handles int, namespace str
}
// if there has blobs, it needs to back up too.
blobs := rawdb.ReadRawBlobSidecars(chainDb, blockHash, blockNumber)
if blobs != nil {
// Write into new ancient_back db.
if _, err := rawdb.WriteAncientBlocksWithSidecars(frdbBack, []*types.Block{block}, []types.Receipts{receipts}, td, []types.BlobSidecars{blobs}); err != nil {
log.Error("failed to write new ancient", "error", err)
return err
}
} else {
// Write into new ancient_back db.
if _, err := rawdb.WriteAncientBlocks(frdbBack, []*types.Block{block}, []types.Receipts{receipts}, td); err != nil {
log.Error("failed to write new ancient", "error", err)
return err
}
block.WithSidecars(blobs)
// Write into new ancient_back db.
if _, err := rawdb.WriteAncientBlocks(frdbBack, []*types.Block{block}, []types.Receipts{receipts}, td); err != nil {
log.Error("failed to write new ancient", "error", err)
return err
}
// Print the log every 5s for better trace.
if common.PrettyDuration(time.Since(start)) > common.PrettyDuration(5*time.Second) {
Expand Down
2 changes: 1 addition & 1 deletion core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func (b *Block) Sidecars() BlobSidecars {
}

func (b *Block) CleanSidecars() {
b.sidecars = nil
b.sidecars = make(BlobSidecars, 0)
}

type writeCounter uint64
Expand Down
4 changes: 4 additions & 0 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,10 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti
// env.receipts = receipts
finalizeBlockTimer.UpdateSince(finalizeStart)

// If Cancun enabled, sidecars can't be nil then.
if w.chainConfig.IsCancun(env.header.Number, env.header.Time) && env.sidecars == nil {
env.sidecars = make(types.BlobSidecars, 0)
}
// Create a local environment copy, avoid the data race with snapshot state.
// https://github.com/ethereum/go-ethereum/issues/24299
env := env.copy()
Expand Down

0 comments on commit 5df5f71

Please sign in to comment.