Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blobtx mining + brodcasting #2253

Merged
merged 25 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
468c53c
ci: temp enable blobtx branch ci run;
galaio Feb 19, 2024
db55d1f
Switch ON blobpool & ensure Cancun hardfork can occur (#2223)
emailtovamos Feb 21, 2024
8e6bafe
feat: support blob storage & miscs; (#2229)
galaio Mar 4, 2024
f135676
miner: set sidecar in block + todo
emailtovamos Mar 4, 2024
e8ecd22
types: setsidecars
emailtovamos Mar 4, 2024
023e796
eth: NewBlockWithBlobMsg+NewBlockWithBlobPacket
emailtovamos Mar 4, 2024
33305b1
eth: AsyncSendNewBlockAndBlob
emailtovamos Mar 5, 2024
6eef58a
eth: enqueue block with blobs to be inserted later
emailtovamos Mar 6, 2024
59a33e9
eth: remove NewBlockWithBlobMsg
emailtovamos Mar 7, 2024
f0ed1d4
eth: blockbodies todo
emailtovamos Mar 7, 2024
123eab6
eth: todos
emailtovamos Mar 8, 2024
4c12054
Merge branch 'blobtx' of https://github.com/bnb-chain/bsc into blobtx
emailtovamos Mar 8, 2024
a2b196f
Merge branch 'blobtx' into blobtx-mining
emailtovamos Mar 8, 2024
653e210
eth: blockBodiesMsg related changes part 1
emailtovamos Mar 11, 2024
eca640b
params: version + remove unwanted comments
emailtovamos Mar 11, 2024
851ae4a
eth: remove NewBlockWithBlobPacket
emailtovamos Mar 12, 2024
cfb5800
eth: pointer for sidecar
emailtovamos Mar 12, 2024
978948c
eth: remove version
emailtovamos Mar 12, 2024
45ec15f
Merge branch 'blobtx' of https://github.com/bnb-chain/bsc into blobtx
emailtovamos Mar 12, 2024
c673b3e
Merge branch 'blobtx' into blobtx-mining
emailtovamos Mar 12, 2024
0effec5
eth: remove version
emailtovamos Mar 12, 2024
153f603
eth: block bodies changes for downloader
emailtovamos Mar 12, 2024
9a99cc5
eth: remove old todos
emailtovamos Mar 12, 2024
aaccf9a
core: remove blobversion
emailtovamos Mar 12, 2024
10cfa61
eth: remove blockAndBlobPropagation
emailtovamos Mar 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/types/tx_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ type BlobTx struct {

type BlobTxSidecars []*BlobTxSidecar

// Len returns the length of s.
func (s BlobTxSidecars) Len() int { return len(s) }

// EncodeIndex encodes the i'th BlobTxSidecar to w. Note that this does not check for errors
// because we assume that BlobTxSidecars will only ever contain valid sidecars
func (s BlobTxSidecars) EncodeIndex(i int, w *bytes.Buffer) {
rlp.Encode(w, s[i])
}

// BlobTxSidecar contains the blobs of a blob transaction.
type BlobTxSidecar struct {
Blobs []kzg4844.Blob // Blobs needed by the blob pool
Expand Down
6 changes: 3 additions & 3 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
)
blocks := make([]*types.Block, len(results))
for i, result := range results {
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals)
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals).WithBlobs(result.Sidecars)
}
// Downloaded blocks are always regarded as trusted after the
// transition. Because the downloaded chain is guided by the
Expand Down Expand Up @@ -1599,7 +1599,7 @@ func (d *Downloader) commitSnapSyncData(results []*fetchResult, stateSync *state
blocks := make([]*types.Block, len(results))
receipts := make([]types.Receipts, len(results))
for i, result := range results {
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals)
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals).WithBlobs(result.Sidecars)
receipts[i] = result.Receipts
}
if index, err := d.blockchain.InsertReceiptChain(blocks, receipts, d.ancientLimit); err != nil {
Expand All @@ -1610,7 +1610,7 @@ func (d *Downloader) commitSnapSyncData(results []*fetchResult, stateSync *state
}

func (d *Downloader) commitPivotBlock(result *fetchResult) error {
block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals)
block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals).WithBlobs(result.Sidecars)
log.Debug("Committing snap sync pivot as new head", "number", block.Number(), "hash", block.Hash())

// Commit the pivot block as the new head, will require full sync from here on
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/fetchers_concurrent_bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ func (q *bodyQueue) request(peer *peerConnection, req *fetchRequest, resCh chan
// deliver is responsible for taking a generic response packet from the concurrent
// fetcher, unpacking the body data and delivering it to the downloader's queue.
func (q *bodyQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) {
txs, uncles, withdrawals := packet.Res.(*eth.BlockBodiesResponse).Unpack()
txs, uncles, withdrawals, sidecars := packet.Res.(*eth.BlockBodiesResponse).Unpack()
hashsets := packet.Meta.([][]common.Hash) // {txs hashes, uncle hashes, withdrawal hashes}

accepted, err := q.queue.DeliverBodies(peer.id, txs, hashsets[0], uncles, hashsets[1], withdrawals, hashsets[2])
accepted, err := q.queue.DeliverBodies(peer.id, txs, hashsets[0], uncles, hashsets[1], withdrawals, hashsets[2], sidecars)
switch {
case err == nil && len(txs) == 0:
peer.log.Trace("Requested bodies delivered")
Expand Down
4 changes: 3 additions & 1 deletion eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type fetchResult struct {
Transactions types.Transactions
Receipts types.Receipts
Withdrawals types.Withdrawals
Sidecars types.BlobTxSidecars
}

func newFetchResult(header *types.Header, fastSync bool, pid string) *fetchResult {
Expand Down Expand Up @@ -776,7 +777,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, hashes []comm
// also wakes any threads waiting for data delivery.
func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListHashes []common.Hash,
uncleLists [][]*types.Header, uncleListHashes []common.Hash,
withdrawalLists [][]*types.Withdrawal, withdrawalListHashes []common.Hash) (int, error) {
withdrawalLists [][]*types.Withdrawal, withdrawalListHashes []common.Hash, sidecars [][]*types.BlobTxSidecar) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()

Expand Down Expand Up @@ -838,6 +839,7 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListH
result.Transactions = txLists[index]
result.Uncles = uncleLists[index]
result.Withdrawals = withdrawalLists[index]
result.Sidecars = sidecars[index]
result.SetBodyDone()
}
return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool,
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func XTestDelivery(t *testing.T) {
uncleHashes[i] = types.CalcUncleHash(uncles)
}
time.Sleep(100 * time.Millisecond)
_, err := q.DeliverBodies(peer.id, txset, txsHashes, uncleset, uncleHashes, nil, nil)
_, err := q.DeliverBodies(peer.id, txset, txsHashes, uncleset, uncleHashes, nil, nil, nil)
if err != nil {
fmt.Printf("delivered %d bodies %v\n", len(txset), err)
}
Expand Down
3 changes: 2 additions & 1 deletion eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,8 @@ func (f *BlockFetcher) loop() {
case res := <-resCh:
res.Done <- nil
// Ignoring withdrawals here, since the block fetcher is not used post-merge.
txs, uncles, _ := res.Res.(*eth.BlockBodiesResponse).Unpack()
// todo 4844 is it ok to ignore sidecars here too?
txs, uncles, _, _ := res.Res.(*eth.BlockBodiesResponse).Unpack()
f.FilterBodies(peer, txs, uncles, time.Now())

case <-timeout.C:
Expand Down
1 change: 1 addition & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
} else {
transfer = peers[:int(math.Sqrt(float64(len(peers))))]
}

for _, peer := range transfer {
peer.AsyncSendNewBlock(block, td)
}
Expand Down
11 changes: 9 additions & 2 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
return h.handleBlockAnnounces(peer, hashes, numbers)

case *eth.NewBlockPacket:
return h.handleBlockBroadcast(peer, packet.Block, packet.TD)
return h.handleBlockBroadcast(peer, packet)

case *eth.NewPooledTransactionHashesPacket67:
return h.txFetcher.Notify(peer.ID(), nil, nil, *packet)
Expand Down Expand Up @@ -118,13 +118,20 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash,

// handleBlockBroadcast is invoked from a peer's message handler when it transmits a
// block broadcast for the local node to process.
func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td *big.Int) error {
func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, packet *eth.NewBlockPacket) error {
// Drop all incoming block announces from the p2p network if
// the chain already entered the pos stage and disconnect the
// remote peer.
if h.merger.PoSFinalized() {
return errors.New("disallowed block broadcast")
}
block := packet.Block
td := packet.TD
sidecars := packet.Sidecars
if sidecars != nil {
block = block.WithBlobs(sidecars)
}

// Schedule the block for import
h.blockFetcher.Enqueue(peer.ID(), block)
galaio marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
11 changes: 8 additions & 3 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ const (
// blockPropagation is a block propagation event, waiting for its turn in the
// broadcast queue.
type blockPropagation struct {
block *types.Block
td *big.Int
block *types.Block
td *big.Int
sidecars types.BlobTxSidecars `rlp:"optional"`
}

// broadcastBlocks is a write loop that multiplexes blocks and block announcements
Expand All @@ -47,7 +48,11 @@ func (p *Peer) broadcastBlocks() {
if err := p.SendNewBlock(prop.block, prop.td); err != nil {
return
}
p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)
if len(prop.sidecars) > 0 {
p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td, "sidecars", prop.sidecars.Len())
} else {
p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)
}

case block := <-p.queuedBlockAnns:
if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
Expand Down
162 changes: 161 additions & 1 deletion eth/protocols/eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package eth

import (
rand2 "crypto/rand"
"io"
"math"
"math/big"
"math/rand"
Expand All @@ -33,10 +35,14 @@
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/eth/protocols/bsc"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/uint256"
)

var (
Expand Down Expand Up @@ -146,7 +152,7 @@
panic("data processing tests should be done in the handler package")
}
func (b *testBackend) Handle(*Peer, Packet) error {
panic("data processing tests should be done in the handler package")
return nil
}

// Tests that block headers can be retrieved from a remote chain based on user queries.
Expand Down Expand Up @@ -502,3 +508,157 @@
t.Errorf("receipts mismatch: %v", err)
}
}

func TestHandleNewBlock(t *testing.T) {
t.Parallel()

gen := func(n int, g *core.BlockGen) {
if n%2 == 0 {
w := &types.Withdrawal{
Address: common.Address{0xaa},
Amount: 42,
}
g.AddWithdrawal(w)
}
}

backend := newTestBackendWithGenerator(maxBodiesServe+15, true, gen)
defer backend.close()

peer, _ := newTestPeer("peer", ETH68, backend)
defer peer.close()

v := new(uint32)
*v = 1
genBlobs := makeBlkBlobs(1, 2)
tx1 := types.NewTx(&types.BlobTx{
ChainID: new(uint256.Int).SetUint64(1),
GasTipCap: new(uint256.Int),
GasFeeCap: new(uint256.Int),
Gas: 0,
Value: new(uint256.Int),
Data: nil,
BlobFeeCap: new(uint256.Int),
BlobHashes: []common.Hash{common.HexToHash("0x34ec6e64f9cda8fe0451a391e4798085a3ef51a65ed1bfb016e34fc1a2028f8f"), common.HexToHash("0xb9a412e875f29fac436acde234f954e91173c4cf79814f6dcf630d8a6345747f")},
Sidecar: genBlobs[0],
V: new(uint256.Int),
R: new(uint256.Int),
S: new(uint256.Int),
})
sidecars := types.BlobTxSidecars{tx1.BlobTxSidecar()}
block := types.NewBlockWithHeader(&types.Header{
Number: big.NewInt(0),
Extra: []byte("test block"),
UncleHash: types.EmptyUncleHash,
TxHash: types.EmptyTxsHash,
ReceiptHash: types.EmptyReceiptsHash,
})
dataNil := NewBlockPacket{
Block: block,
TD: big.NewInt(1),
Sidecars: nil,
}
dataNonNil := NewBlockPacket{
Block: block,
TD: big.NewInt(1),
Sidecars: sidecars,
}
sizeNonNil, rNonNil, _ := rlp.EncodeToReader(dataNonNil)
sizeNil, rNil, _ := rlp.EncodeToReader(dataNil)

// Define the test cases
testCases := []struct {
name string
msg p2p.Msg
err error
}{
{
name: "Valid block",
msg: p2p.Msg{
Code: 1,
Size: uint32(sizeNonNil),
Payload: rNonNil,
},
err: nil,
},
{
name: "Nil sidecars",
msg: p2p.Msg{
Code: 2,
Size: uint32(sizeNil),
Payload: rNil,
},
err: nil,
},
}

protos := []p2p.Protocol{
{
Name: "eth",
Version: ETH67,
},
{
Name: "eth",
Version: ETH68,
},
{
Name: "bsc",
Version: bsc.Bsc1,
},
}
caps := []p2p.Cap{
{
Name: "eth",
Version: ETH67,
},
{
Name: "eth",
Version: ETH68,
},
{
Name: "bsc",
Version: bsc.Bsc1,
},
}
// Create a source handler to send messages through and a sink peer to receive them
p2pEthSrc, p2pEthSink := p2p.MsgPipe()
defer p2pEthSrc.Close()
defer p2pEthSink.Close()

localEth := NewPeer(ETH68, p2p.NewPeerWithProtocols(enode.ID{1}, protos, "", caps), p2pEthSrc, nil)

// Run the tests
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
err := handleNewBlock(backend, tc.msg, localEth)
if err != tc.err {
t.Errorf("expected error %v, got %v", tc.err, err)
}
})
}

Check failure on line 640 in eth/protocols/eth/handler_test.go

View workflow job for this annotation

GitHub Actions / golang-lint (1.21.x, ubuntu-latest)

unnecessary trailing newline (whitespace)
}

func makeBlkBlobs(n, nPerTx int) types.BlobTxSidecars {
if n <= 0 {
return nil
}
ret := make(types.BlobTxSidecars, n)
for i := 0; i < n; i++ {
blobs := make([]kzg4844.Blob, nPerTx)
commitments := make([]kzg4844.Commitment, nPerTx)
proofs := make([]kzg4844.Proof, nPerTx)
for i := 0; i < nPerTx; i++ {
io.ReadFull(rand2.Reader, blobs[i][:])
commitments[i], _ = kzg4844.BlobToCommitment(blobs[i])
proofs[i], _ = kzg4844.ComputeBlobProof(blobs[i], commitments[i])
}
ret[i] = &types.BlobTxSidecar{
Blobs: blobs,
Commitments: commitments,
Proofs: proofs,
}
}
return ret
}
23 changes: 20 additions & 3 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,24 @@ func ServiceGetBlockBodiesQuery(chain *core.BlockChain, query GetBlockBodiesRequ
lookups >= 2*maxBodiesServe {
break
}
if data := chain.GetBodyRLP(hash); len(data) != 0 {
bodies = append(bodies, data)
bytes += len(data)
body := chain.GetBody(hash)
if body == nil {
continue
}
blobs := chain.GetBlobsByHash(hash)
bodyWithBlobs := &BlockBody{
Transactions: body.Transactions,
Uncles: body.Uncles,
Withdrawals: body.Withdrawals,
Sidecars: blobs,
}
enc, err := rlp.EncodeToBytes(bodyWithBlobs)
if err != nil {
log.Error("block body encode err", "hash", hash, "err", err)
continue
}
bodies = append(bodies, enc)
bytes += len(enc)
}
return bodies
}
Expand Down Expand Up @@ -293,9 +307,12 @@ func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error {
if err := msg.Decode(ann); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}

// Now that we have our packet, perform operations using the interface methods
if err := ann.sanityCheck(); err != nil {
return err
}

if hash := types.CalcUncleHash(ann.Block.Uncles()); hash != ann.Block.UncleHash() {
log.Warn("Propagated block has invalid uncles", "have", hash, "exp", ann.Block.UncleHash())
return nil // TODO(karalabe): return error eventually, but wait a few releases
Copy link
Contributor

@NathanBSC NathanBSC Mar 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func handleBlockHeaders need some changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Header doesn't carry sidecar specific things. What change should be made?

Copy link
Contributor

@NathanBSC NathanBSC Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I mean handleBlockBodies

Expand Down
Loading
Loading