Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

beacon/engine, core/txpool, eth/catalyst: add engine_getBlobsV1 API #30537

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions beacon/engine/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ type BlobsBundleV1 struct {
Blobs []hexutil.Bytes `json:"blobs"`
}

type BlobAndProofV1 struct {
Blob hexutil.Bytes `json:"blob"`
Proof hexutil.Bytes `json:"proof"`
}

// JSON type overrides for ExecutionPayloadEnvelope.
type executionPayloadEnvelopeMarshaling struct {
BlockValue *hexutil.Big
Expand Down
104 changes: 79 additions & 25 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
Expand Down Expand Up @@ -88,9 +89,11 @@ const (
// bare minimum needed fields to keep the size down (and thus number of entries
// larger with the same memory consumption).
type blobTxMeta struct {
hash common.Hash // Transaction hash to maintain the lookup table
id uint64 // Storage ID in the pool's persistent store
size uint32 // Byte size in the pool's persistent store
hash common.Hash // Transaction hash to maintain the lookup table
vhashes []common.Hash // Blob versioned hashes to maintain the lookup table

id uint64 // Storage ID in the pool's persistent store
size uint32 // Byte size in the pool's persistent store

nonce uint64 // Needed to prioritize inclusion order within an account
costCap *uint256.Int // Needed to validate cumulative balance sufficiency
Expand All @@ -113,6 +116,7 @@ type blobTxMeta struct {
func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta {
meta := &blobTxMeta{
hash: tx.Hash(),
vhashes: tx.BlobHashes(),
id: id,
size: size,
nonce: tx.Nonce(),
Expand Down Expand Up @@ -306,7 +310,7 @@ type BlobPool struct {
state *state.StateDB // Current state at the head of the chain
gasTip *uint256.Int // Currently accepted minimum gas tip

lookup map[common.Hash]uint64 // Lookup table mapping hashes to tx billy entries
lookup *lookup // Lookup table mapping blobs to txs and txs to billy entries
index map[common.Address][]*blobTxMeta // Blob transactions grouped by accounts, sorted by nonce
spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts
evict *evictHeap // Heap of cheapest accounts for eviction when full
Expand All @@ -328,7 +332,7 @@ func New(config Config, chain BlockChain) *BlobPool {
config: config,
signer: types.LatestSigner(chain.Config()),
chain: chain,
lookup: make(map[common.Hash]uint64),
lookup: newLookup(),
index: make(map[common.Address][]*blobTxMeta),
spent: make(map[common.Address]*uint256.Int),
}
Expand Down Expand Up @@ -471,7 +475,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
}

meta := newBlobTxMeta(id, size, tx)
if _, exists := p.lookup[meta.hash]; exists {
if p.lookup.exists(meta.hash) {
// This path is only possible after a crash, where deleted items are not
// removed via the normal shutdown-startup procedure and thus may get
// partially resurrected.
Expand All @@ -496,9 +500,8 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
p.index[sender] = append(p.index[sender], meta)
p.spent[sender] = new(uint256.Int).Add(p.spent[sender], meta.costCap)

p.lookup[meta.hash] = meta.id
p.lookup.track(meta)
p.stored += uint64(meta.size)

return nil
}

Expand Down Expand Up @@ -531,7 +534,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
nonces = append(nonces, txs[i].nonce)

p.stored -= uint64(txs[i].size)
delete(p.lookup, txs[i].hash)
p.lookup.untrack(txs[i])

// Included transactions blobs need to be moved to the limbo
if filled && inclusions != nil {
Expand Down Expand Up @@ -572,7 +575,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6

p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[0].costCap)
p.stored -= uint64(txs[0].size)
delete(p.lookup, txs[0].hash)
p.lookup.untrack(txs[0])

// Included transactions blobs need to be moved to the limbo
if inclusions != nil {
Expand Down Expand Up @@ -621,14 +624,14 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
// crash would result in previously deleted entities being resurrected.
// That could potentially cause a duplicate nonce to appear.
if txs[i].nonce == txs[i-1].nonce {
id := p.lookup[txs[i].hash]
id, _ := p.lookup.storeidOfTx(txs[i].hash)

log.Error("Dropping repeat nonce blob transaction", "from", addr, "nonce", txs[i].nonce, "id", id)
dropRepeatedMeter.Mark(1)

p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap)
p.stored -= uint64(txs[i].size)
delete(p.lookup, txs[i].hash)
p.lookup.untrack(txs[i])

if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
Expand All @@ -650,7 +653,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6

p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[j].costCap)
p.stored -= uint64(txs[j].size)
delete(p.lookup, txs[j].hash)
p.lookup.untrack(txs[j])
}
txs = txs[:i]

Expand Down Expand Up @@ -688,7 +691,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6

p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], last.costCap)
p.stored -= uint64(last.size)
delete(p.lookup, last.hash)
p.lookup.untrack(last)
}
if len(txs) == 0 {
delete(p.index, addr)
Expand Down Expand Up @@ -728,7 +731,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6

p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], last.costCap)
p.stored -= uint64(last.size)
delete(p.lookup, last.hash)
p.lookup.untrack(last)
}
p.index[addr] = txs

Expand Down Expand Up @@ -1006,7 +1009,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
p.index[addr] = append(p.index[addr], meta)
p.spent[addr] = new(uint256.Int).Add(p.spent[addr], meta.costCap)
}
p.lookup[meta.hash] = meta.id
p.lookup.track(meta)
p.stored += uint64(meta.size)
return nil
}
Expand All @@ -1033,7 +1036,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
)
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap)
p.stored -= uint64(tx.size)
delete(p.lookup, tx.hash)
p.lookup.untrack(tx)
txs[i] = nil

// Drop everything afterwards, no gaps allowed
Expand All @@ -1043,7 +1046,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {

p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], tx.costCap)
p.stored -= uint64(tx.size)
delete(p.lookup, tx.hash)
p.lookup.untrack(tx)
txs[i+1+j] = nil
}
// Clear out the dropped transactions from the index
Expand Down Expand Up @@ -1171,8 +1174,7 @@ func (p *BlobPool) Has(hash common.Hash) bool {
p.lock.RLock()
defer p.lock.RUnlock()

_, ok := p.lookup[hash]
return ok
return p.lookup.exists(hash)
}

// Get returns a transaction if it is contained in the pool, or nil otherwise.
Expand All @@ -1189,7 +1191,7 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
}(time.Now())

// Pull the blob from disk and return an assembled response
id, ok := p.lookup[hash]
id, ok := p.lookup.storeidOfTx(hash)
if !ok {
return nil
}
Expand All @@ -1206,6 +1208,58 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
return item
}

// GetBlobs returns a number of blobs are proofs for the given versioned hashes.
// This is a utility method for the engine API, enabling consensus clients to
// retrieve blobs from the pools directly instead of the network.
func (p *BlobPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Proof) {
// Create a map of the blob hash to indices for faster fills
var (
blobs = make([]*kzg4844.Blob, len(vhashes))
proofs = make([]*kzg4844.Proof, len(vhashes))
)
index := make(map[common.Hash]int)
for i, vhash := range vhashes {
index[vhash] = i
}
// Iterate over the blob hashes, pulling transactions that fill it. Take care
// to also fill anything else the transaction might include (probably will).
for i, vhash := range vhashes {
// If already filled by a previous fetch, skip
if blobs[i] != nil {
continue
}
// Unfilled, retrieve the datastore item (in a short lock)
p.lock.RLock()
id, exists := p.lookup.storeidOfBlob(vhash)
if !exists {
p.lock.RUnlock()
continue
}
data, err := p.store.Get(id)
p.lock.RUnlock()

// After releasing the lock, try to fill any blobs requested
if err != nil {
log.Error("Tracked blob transaction missing from store", "id", id, "err", err)
continue
}
item := new(types.Transaction)
if err = rlp.DecodeBytes(data, item); err != nil {
log.Error("Blobs corrupted for traced transaction", "id", id, "err", err)
continue
}
// Fill anything requested, not just the current versioned hash
sidecar := item.BlobTxSidecar()
for j, blobhash := range item.BlobHashes() {
if idx, ok := index[blobhash]; ok {
blobs[idx] = &sidecar.Blobs[j]
proofs[idx] = &sidecar.Proofs[j]
}
}
}
return blobs, proofs
}

// Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restrictions).
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
Expand Down Expand Up @@ -1319,8 +1373,8 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
p.spent[from] = new(uint256.Int).Sub(p.spent[from], prev.costCap)
p.spent[from] = new(uint256.Int).Add(p.spent[from], meta.costCap)

delete(p.lookup, prev.hash)
p.lookup[meta.hash] = meta.id
p.lookup.untrack(prev)
p.lookup.track(meta)
p.stored += uint64(meta.size) - uint64(prev.size)
} else {
// Transaction extends previously scheduled ones
Expand All @@ -1330,7 +1384,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
newacc = true
}
p.spent[from] = new(uint256.Int).Add(p.spent[from], meta.costCap)
p.lookup[meta.hash] = meta.id
p.lookup.track(meta)
p.stored += uint64(meta.size)
}
// Recompute the rolling eviction fields. In case of a replacement, this will
Expand Down Expand Up @@ -1419,7 +1473,7 @@ func (p *BlobPool) drop() {
p.spent[from] = new(uint256.Int).Sub(p.spent[from], drop.costCap)
}
p.stored -= uint64(drop.size)
delete(p.lookup, drop.hash)
p.lookup.untrack(drop)

// Remove the transaction from the pool's eviction heap:
// - If the entire account was dropped, pop off the address
Expand Down
Loading