Skip to content

Commit

Permalink
feat: add eth 68 protocol (#117)
Browse files Browse the repository at this point in the history
* fix: eth 68 protocol

* fix: test failure

* fix: test failure

* fix: protocol name for eth68

* fix: support eth protocol name

* fix: lint error

* fix: use  protocol name when handshaking

* fix: protocol name

* fix: protocol match bug

* fix: not load NCPExit contract

* fix: apply comment
  • Loading branch information
egonspace authored Sep 11, 2024
1 parent ccb75b6 commit 543e610
Show file tree
Hide file tree
Showing 15 changed files with 183 additions and 48 deletions.
23 changes: 18 additions & 5 deletions cmd/devp2p/internal/ethtest/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,17 +755,20 @@ func (s *Suite) TestNewPooledTxs66(t *utesting.T) {
}

// generate 50 txs
hashMap, _, err := generateTxs(s, 50)
_, txs, err := generateTxs(s, 50)
if err != nil {
t.Fatalf("failed to generate transactions: %v", err)
}

// create new pooled tx hashes announcement
hashes := make([]common.Hash, 0)
for _, hash := range hashMap {
hashes = append(hashes, hash)
hashes := make([]common.Hash, len(txs))
types := make([]byte, len(txs))
sizes := make([]uint32, len(txs))
for i, tx := range txs {
hashes[i] = tx.Hash()
types[i] = tx.Type()
sizes[i] = uint32(tx.Size())
}
announce := NewPooledTransactionHashes(hashes)

// send announcement
conn, err := s.dial66()
Expand All @@ -776,6 +779,14 @@ func (s *Suite) TestNewPooledTxs66(t *utesting.T) {
if err = conn.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}

var announce Message
if conn.negotiatedProtoVersion == eth.ETH68 {
announce = NewPooledTransactionHashes68{Types: types}
} else {
announce = NewPooledTransactionHashes(hashes)
}

if err = conn.Write(announce); err != nil {
t.Fatalf("failed to write to connection: %v", err)
}
Expand All @@ -792,6 +803,8 @@ func (s *Suite) TestNewPooledTxs66(t *utesting.T) {
// ignore propagated txs from previous tests
case *NewPooledTransactionHashes:
continue
case *NewPooledTransactionHashes68:
continue
// ignore block announcements from previous tests
case *NewBlockHashes:
continue
Expand Down
4 changes: 4 additions & 0 deletions cmd/devp2p/internal/ethtest/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket

func (nb NewPooledTransactionHashes) Code() int { return 24 }

type NewPooledTransactionHashes68 eth.NewPooledTransactionHashesPacket68

func (nb NewPooledTransactionHashes68) Code() int { return 24 }

type GetPooledTransactions eth.GetPooledTransactionsPacket

func (gpt GetPooledTransactions) Code() int { return 25 }
Expand Down
4 changes: 2 additions & 2 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ func ReadHeaderRange(db ethdb.Reader, number uint64, count uint64) []rlp.RawValu
return rlpHeaders
}
// read remaining from ancients
max := count * 700
data, err := db.AncientRange(freezerHeaderTable, i+1-count, count, max)
maxBytes := count * 8000 // WEMIX mainnet block header size is 4K bytes in normal
data, err := db.AncientRange(freezerHeaderTable, i+1-count, count, maxBytes)
if err == nil && uint64(len(data)) == count {
// the data is on the order [h, h+1, .., n] -- reordering needed
for i := range data {
Expand Down
3 changes: 3 additions & 0 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
case *eth.NewPooledTransactionHashesPacket:
return h.txFetcher.Notify(peer.ID(), *packet)

case *eth.NewPooledTransactionHashesPacket68:
return h.txFetcher.Notify(peer.ID(), packet.Hashes)

case *eth.TransactionsPacket:
return h.txFetcher.Enqueue(peer.ID(), *packet, false)

Expand Down
11 changes: 10 additions & 1 deletion eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
h.txAnnounces.Send(([]common.Hash)(*packet))
return nil

case *eth.NewPooledTransactionHashesPacket68:
h.txAnnounces.Send(packet.Hashes)
return nil

case *eth.TransactionsPacket:
h.txBroadcasts.Send(([]*types.Transaction)(*packet))
return nil
Expand All @@ -82,6 +86,7 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
// fork IDs in the protocol handshake.
func TestForkIDSplit65(t *testing.T) { testForkIDSplit(t, eth.ETH65) }
func TestForkIDSplit66(t *testing.T) { testForkIDSplit(t, eth.ETH66) }
func TestForkIDSplit68(t *testing.T) { testForkIDSplit(t, eth.ETH68) }

func testForkIDSplit(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -240,6 +245,7 @@ func testForkIDSplit(t *testing.T, protocol uint) {
// Tests that received transactions are added to the local pool.
func TestRecvTransactions65(t *testing.T) { testRecvTransactions(t, eth.ETH65) }
func TestRecvTransactions66(t *testing.T) { testRecvTransactions(t, eth.ETH66) }
func TestRecvTransactions68(t *testing.T) { testRecvTransactions(t, eth.ETH68) }

func testRecvTransactions(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -297,6 +303,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {

// This test checks that pending transactions are sent.
func TestSendTransactions66(t *testing.T) { testSendTransactions(t, eth.ETH66) }
func TestSendTransactions68(t *testing.T) { testSendTransactions(t, eth.ETH68) }

func testSendTransactions(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -355,7 +362,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
seen := make(map[common.Hash]struct{})
for len(seen) < len(insert) {
switch protocol {
case 66:
case 66, 68:
select {
case hashes := <-anns:
for _, hash := range hashes {
Expand Down Expand Up @@ -383,6 +390,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
// broadcasts or via announcements/retrievals.
func TestTransactionPropagation65(t *testing.T) { testTransactionPropagation(t, eth.ETH65) }
func TestTransactionPropagation66(t *testing.T) { testTransactionPropagation(t, eth.ETH66) }
func TestTransactionPropagation68(t *testing.T) { testTransactionPropagation(t, eth.ETH68) }

func testTransactionPropagation(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -690,6 +698,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
// with the hashes in the header) gets discarded and not broadcast forward.
func TestBroadcastMalformedBlock65(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH65) }
func TestBroadcastMalformedBlock66(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH66) }
func TestBroadcastMalformedBlock68(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH68) }

func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
t.Parallel()
Expand Down
25 changes: 18 additions & 7 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,17 @@ func (p *Peer) announceTransactions() {
if done == nil && len(queue) > 0 {
// Pile transaction hashes until we reach our allowed network limit
var (
count int
pending []common.Hash
size common.StorageSize
count int
pending []common.Hash
pendingTypes []byte
pendingSizes []uint32
size common.StorageSize
)
for count = 0; count < len(queue) && size < maxTxPacketSize; count++ {
if p.txpool.Get(queue[count]) != nil {
if tx := p.txpool.Get(queue[count]); tx != nil {
pending = append(pending, queue[count])
pendingTypes = append(pendingTypes, tx.Type())
pendingSizes = append(pendingSizes, uint32(tx.Size()))
size += common.HashLength
}
}
Expand All @@ -159,9 +163,16 @@ func (p *Peer) announceTransactions() {
if len(pending) > 0 {
done = make(chan struct{})
go func() {
if err := p.sendPooledTransactionHashes(pending); err != nil {
fail <- err
return
if p.version >= ETH68 {
if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil {
fail <- err
return
}
} else {
if err := p.sendPooledTransactionHashes(pending); err != nil {
fail <- err
return
}
}
close(done)
p.Log().Trace("Sent transaction announcements", "count", len(pending))
Expand Down
38 changes: 34 additions & 4 deletions eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,17 @@ func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2
protocols := make([]p2p.Protocol, len(ProtocolVersions))
for i, version := range ProtocolVersions {
version := version // Closure

var matchFunc func(cap p2p.Cap) bool = nil
var representativeNameFunc func() string = nil
if version == ETH68 {
matchFunc = func(cap p2p.Cap) bool {
return (cap.Name == ProtocolName || cap.Name == ProtocolAlias) && cap.Version == ETH68
}
representativeNameFunc = func() string {
// When WEMIX node does p2p.handshaking, it sends `eth` for the `eth68` protocol name instead of `mir`
return ProtocolAlias
}
}
protocols[i] = p2p.Protocol{
Name: ProtocolName,
Version: version,
Expand All @@ -120,8 +130,10 @@ func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2
PeerInfo: func(id enode.ID) interface{} {
return backend.PeerInfo(id)
},
Attributes: []enr.Entry{currentENREntry(backend.Chain())},
DialCandidates: dnsdisc,
Attributes: []enr.Entry{currentENREntry(backend.Chain())},
DialCandidates: dnsdisc,
Match: matchFunc,
RepresentativeName: representativeNameFunc,
}
}
return protocols
Expand Down Expand Up @@ -216,6 +228,21 @@ var eth66 = map[uint64]msgHandler{
TransactionsExMsg: handleTransactionsEx,
}

var eth68 = map[uint64]msgHandler{
NewBlockHashesMsg: handleNewBlockhashes,
NewBlockMsg: handleNewBlock,
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes68,
GetBlockHeadersMsg: handleGetBlockHeaders66,
BlockHeadersMsg: handleBlockHeaders66,
GetBlockBodiesMsg: handleGetBlockBodies66,
BlockBodiesMsg: handleBlockBodies66,
GetReceiptsMsg: handleGetReceipts66,
ReceiptsMsg: handleReceipts66,
GetPooledTransactionsMsg: handleGetPooledTransactions66,
PooledTransactionsMsg: handlePooledTransactions66,
}

// handleMessage is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func handleMessage(backend Backend, peer *Peer) error {
Expand All @@ -230,9 +257,12 @@ func handleMessage(backend Backend, peer *Peer) error {
defer msg.Discard()

var handlers = eth65
if peer.Version() >= ETH66 {
if peer.Version() == ETH66 {
handlers = eth66
}
if peer.Version() >= ETH68 {
handlers = eth68
}

// Track the amount of time it takes to serve the request and run the handler
if metrics.Enabled {
Expand Down
21 changes: 16 additions & 5 deletions eth/protocols/eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (b *testBackend) Handle(*Peer, Packet) error {
// Tests that block headers can be retrieved from a remote chain based on user queries.
func TestGetBlockHeaders65(t *testing.T) { testGetBlockHeaders(t, ETH65) }
func TestGetBlockHeaders66(t *testing.T) { testGetBlockHeaders(t, ETH66) }
func TestGetBlockHeaders68(t *testing.T) { testGetBlockHeaders(t, ETH68) }

func testGetBlockHeaders(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -312,6 +313,7 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
// Tests that block contents can be retrieved from a remote chain based on their hashes.
func TestGetBlockBodies65(t *testing.T) { testGetBlockBodies(t, ETH65) }
func TestGetBlockBodies66(t *testing.T) { testGetBlockBodies(t, ETH66) }
func TestGetBlockBodies68(t *testing.T) { testGetBlockBodies(t, ETH68) }

func testGetBlockBodies(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -401,10 +403,11 @@ func testGetBlockBodies(t *testing.T, protocol uint) {
}

// Tests that the state trie nodes can be retrieved based on hashes.
func TestGetNodeData65(t *testing.T) { testGetNodeData(t, ETH65) }
func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66) }
func TestGetNodeData65(t *testing.T) { testGetNodeData(t, ETH65, false) }
func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66, false) }
func TestGetNodeData68(t *testing.T) { testGetNodeData(t, ETH68, true) }

func testGetNodeData(t *testing.T, protocol uint) {
func testGetNodeData(t *testing.T, protocol uint, drop bool) {
t.Parallel()

// Define three accounts to simulate transactions with
Expand Down Expand Up @@ -469,8 +472,15 @@ func testGetNodeData(t *testing.T, protocol uint) {
})
}
msg, err := peer.app.ReadMsg()
if err != nil {
t.Fatalf("failed to read node data response: %v", err)
if !drop {
if err != nil {
t.Fatalf("failed to read node data response: %v", err)
}
} else {
if err != nil {
return
}
t.Fatalf("succeeded to read node data response on non-supporting protocol: %v", msg)
}
if msg.Code != NodeDataMsg {
t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, NodeDataMsg)
Expand Down Expand Up @@ -524,6 +534,7 @@ func testGetNodeData(t *testing.T, protocol uint) {
// Tests that the transaction receipts can be retrieved based on hashes.
func TestGetBlockReceipts65(t *testing.T) { testGetBlockReceipts(t, ETH65) }
func TestGetBlockReceipts66(t *testing.T) { testGetBlockReceipts(t, ETH66) }
func TestGetBlockReceipts68(t *testing.T) { testGetBlockReceipts(t, ETH68) }

func testGetBlockReceipts(t *testing.T, protocol uint) {
t.Parallel()
Expand Down
39 changes: 28 additions & 11 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,7 @@ func handleGetBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
}
f := func() error {
response := ServiceGetBlockHeadersQuery(backend.Chain(), query.GetBlockHeadersPacket, peer)
if len(response) == int(query.GetBlockHeadersPacket.Amount) {
return peer.ReplyBlockHeadersRLP(query.RequestId, response)
} else {
// Wemix: fall back to old behavior
response2 := answerGetBlockHeadersQuery(backend, query.GetBlockHeadersPacket, peer)
if len(response2) > len(response) {
return peer.ReplyBlockHeaders(query.RequestId, response2)
} else {
return peer.ReplyBlockHeadersRLP(query.RequestId, response)
}
}
return peer.ReplyBlockHeadersRLP(query.RequestId, response)
}
if params.ConsensusMethod == params.ConsensusPoW {
return f()
Expand Down Expand Up @@ -798,6 +788,33 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer)
return nil
}

func handleNewPooledTransactionHashes68(backend Backend, msg Decoder, peer *Peer) error {
// New transaction announcement arrived, make sure we have
// a valid and fresh chain to handle them
if !backend.AcceptTxs() {
return nil
}
ann := new(NewPooledTransactionHashesPacket68)
if err := msg.Decode(ann); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
if len(ann.Hashes) != len(ann.Types) || len(ann.Hashes) != len(ann.Sizes) {
return fmt.Errorf("%w: message %v: invalid len of fields: %v %v %v", errDecode, msg, len(ann.Hashes), len(ann.Types), len(ann.Sizes))
}
f := func() error {
// Schedule all the unknown hashes for retrieval
for _, hash := range ann.Hashes {
peer.markTransaction(hash)
}
return backend.Handle(peer, ann)
}
if params.ConsensusMethod == params.ConsensusPoW {
return f()
}
go f()
return nil
}

func handleGetPooledTransactions(backend Backend, msg Decoder, peer *Peer) error {
// Decode the pooled transactions retrieval message
var query GetPooledTransactionsPacket
Expand Down
6 changes: 6 additions & 0 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error {
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes))
}

func (p *Peer) sendPooledTransactionHashes68(hashes []common.Hash, types []byte, sizes []uint32) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
p.knownTxs.Add(hashes...)
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket68{Types: types, Sizes: sizes, Hashes: hashes})
}

// AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually
// announce to a remote peer. The number of pending sends are capped (new ones
// will force old sends to be dropped)
Expand Down
2 changes: 2 additions & 0 deletions eth/protocols/eth/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func newTestPeer(name string, version uint, backend Backend) (*testPeer, <-chan
peer := NewPeer(version, p2p.NewPeer(id, name, nil), net, backend.TxPool())
errc := make(chan error, 1)
go func() {
defer app.Close()

errc <- backend.RunPeer(peer, func(peer *Peer) error {
return Handle(backend, peer)
})
Expand Down
Loading

0 comments on commit 543e610

Please sign in to comment.