From 7c6747f0178b60bb9ab3fd4ac13b000cb01dad15 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Tue, 13 Aug 2024 13:27:33 +0800 Subject: [PATCH] eth/fetcher: polish the code --- eth/fetcher/tx_fetcher.go | 117 +++++++++++++++++++-------------- eth/fetcher/tx_fetcher_test.go | 67 +++++-------------- 2 files changed, 84 insertions(+), 100 deletions(-) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index fdc1d6b675c2..d69d7e0ad30e 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -22,6 +22,7 @@ import ( "math" mrand "math/rand" "sort" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -117,13 +118,17 @@ type txAnnounce struct { } // txMetadata provides the extra data transmitted along with the announcement -// for better fetch scheduling ('kind' & 'size'), plus an extra field -// ('arrival') to keep track of its order of arrival. 'size==0' can be used to -// test for 0 pre-eth/68 announcements. In this case, kind will also be 0. +// for better fetch scheduling. type txMetadata struct { - kind byte // Transaction consensus type - size uint32 // Transaction size in bytes, or 0 if the announcement didn't include metadata - arrival uint64 // Value that can be used to sort announcements by order of arrival + kind byte // Transaction consensus type + size uint32 // Transaction size in bytes +} + +// txMetadataWithSeq is a wrapper of transaction metadata with an extra field +// tracking the transaction sequence number. +type txMetadataWithSeq struct { + txMetadata + seq uint64 } // txRequest represents an in-flight transaction retrieval request destined to @@ -171,20 +176,19 @@ type TxFetcher struct { drop chan *txDrop quit chan struct{} + txSeq atomic.Uint64 // Unique transaction sequence number underpriced *lru.Cache[common.Hash, time.Time] // Transactions discarded as too cheap (don't re-fetch) - counter uint64 // counter used to assign arrival order to tx announcements - // Stage 1: Waiting lists for newly discovered transactions that might be // broadcast without needing explicit request/reply round trips. - waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast - waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist - waitslots map[string]map[common.Hash]*txMetadata // Waiting announcements grouped by peer (DoS protection) + waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast + waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist + waitslots map[string]map[common.Hash]*txMetadataWithSeq // Waiting announcements grouped by peer (DoS protection) // Stage 2: Queue of transactions that waiting to be allocated to some peer // to be retrieved directly. - announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer - announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash + announces map[string]map[common.Hash]*txMetadataWithSeq // Set of announced transactions, grouped by origin peer + announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash // Stage 3: Set of transactions currently being retrieved, some which may be // fulfilled and some rescheduled. Note, this step shares 'announces' from the @@ -222,8 +226,8 @@ func NewTxFetcherForTests( quit: make(chan struct{}), waitlist: make(map[common.Hash]map[string]struct{}), waittime: make(map[common.Hash]mclock.AbsTime), - waitslots: make(map[string]map[common.Hash]*txMetadata), - announces: make(map[string]map[common.Hash]*txMetadata), + waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq), + announces: make(map[string]map[common.Hash]*txMetadataWithSeq), announced: make(map[common.Hash]map[string]struct{}), fetching: make(map[common.Hash]string), requests: make(map[string]*txRequest), @@ -449,9 +453,17 @@ func (f *TxFetcher) loop() { // Stage 2 and 3 share the set of origins per tx if announces := f.announces[ann.origin]; announces != nil { - announces[hash] = &ann.metas[i] + announces[hash] = &txMetadataWithSeq{ + txMetadata: ann.metas[i], + seq: f.txSeq.Add(1), + } } else { - f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]} + f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{ + hash: { + txMetadata: ann.metas[i], + seq: f.txSeq.Add(1), + }, + } } continue } @@ -462,26 +474,18 @@ func (f *TxFetcher) loop() { // Stage 2 and 3 share the set of origins per tx if announces := f.announces[ann.origin]; announces != nil { - announces[hash] = &ann.metas[i] - } else { - f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]} - } - continue - } - // If this is a blob tx, schedule it to fetch without being - // waitlisted since blob txs should not be broadcast. If its - // hash is already on the waitlist, it was previously announced - // as a non-blob (or unknown) tx type. In this case we'll just - // eat the delay and continue handling it as a waitlisted tx to - // keep things simple. - if ann.metas[i].kind == types.BlobTxType && f.waitlist[hash] == nil { - f.announced[hash] = map[string]struct{}{ann.origin: {}} - if announces := f.announces[ann.origin]; announces != nil { - announces[hash] = &ann.metas[i] + announces[hash] = &txMetadataWithSeq{ + txMetadata: ann.metas[i], + seq: f.txSeq.Add(1), + } } else { - f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]} + f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{ + hash: { + txMetadata: ann.metas[i], + seq: f.txSeq.Add(1), + }, + } } - f.scheduleFetches(timeoutTimer, timeoutTrigger, map[string]struct{}{ann.origin: {}}) continue } // If the transaction is already known to the fetcher, but not @@ -497,9 +501,17 @@ func (f *TxFetcher) loop() { f.waitlist[hash][ann.origin] = struct{}{} if waitslots := f.waitslots[ann.origin]; waitslots != nil { - waitslots[hash] = &ann.metas[i] + waitslots[hash] = &txMetadataWithSeq{ + txMetadata: ann.metas[i], + seq: f.txSeq.Add(1), + } } else { - f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]} + f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{ + hash: { + txMetadata: ann.metas[i], + seq: f.txSeq.Add(1), + }, + } } continue } @@ -508,9 +520,17 @@ func (f *TxFetcher) loop() { f.waittime[hash] = f.clock.Now() if waitslots := f.waitslots[ann.origin]; waitslots != nil { - waitslots[hash] = &ann.metas[i] + waitslots[hash] = &txMetadataWithSeq{ + txMetadata: ann.metas[i], + seq: f.txSeq.Add(1), + } } else { - f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]} + f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{ + hash: { + txMetadata: ann.metas[i], + seq: f.txSeq.Add(1), + }, + } } } // If a new item was added to the waitlist, schedule it into the fetcher @@ -538,7 +558,7 @@ func (f *TxFetcher) loop() { if announces := f.announces[peer]; announces != nil { announces[hash] = f.waitslots[peer][hash] } else { - f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]} + f.announces[peer] = map[common.Hash]*txMetadataWithSeq{hash: f.waitslots[peer][hash]} } delete(f.waitslots[peer], hash) if len(f.waitslots[peer]) == 0 { @@ -612,7 +632,7 @@ func (f *TxFetcher) loop() { for i, hash := range delivery.hashes { if _, ok := f.waitlist[hash]; ok { for peer, txset := range f.waitslots { - if meta, ok := txset[hash]; ok && meta.size != 0 { + if meta := txset[hash]; meta != nil { if delivery.metas[i].kind != meta.kind { log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind) f.dropPeer(peer) @@ -638,7 +658,7 @@ func (f *TxFetcher) loop() { delete(f.waittime, hash) } else { for peer, txset := range f.announces { - if meta, ok := txset[hash]; ok && meta.size != 0 { + if meta := txset[hash]; meta != nil { if delivery.metas[i].kind != meta.kind { log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind) f.dropPeer(peer) @@ -962,20 +982,21 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string)) // forEachAnnounce loops over the given announcements in arrival order, invoking // the do function for each until it returns false. We enforce an arrival -// ordering to minimize the chances of mempool nonce-gaps, which result in blob -// transactions being rejected by the mempool. -func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta txMetadata) bool) { +// ordering to minimize the chances of transaction nonce-gaps, which result in +// transactions being rejected by the txpool. +func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadataWithSeq, do func(hash common.Hash, meta txMetadata) bool) { type announcement struct { hash common.Hash meta txMetadata + seq uint64 } // process announcements by their arrival order list := make([]announcement, 0, len(announces)) - for hash, metadata := range announces { - list = append(list, announcement{hash: hash, meta: *metadata}) + for hash, entry := range announces { + list = append(list, announcement{hash: hash, meta: entry.txMetadata, seq: entry.seq}) } sort.Slice(list, func(i, j int) bool { - return list[i].meta.arrival < list[j].meta.arrival + return list[i].seq < list[j].seq }) for i := range list { if !do(list[i].hash, list[i].meta) { diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index d115c6f3ffb3..772310ba0eb6 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -179,37 +179,6 @@ func TestTransactionFetcherWaiting(t *testing.T) { }, }), isScheduled{tracking: nil, fetching: nil}, - // Announce a non-conflicting blob tx, which should immediately go - // to fetching without hitting the waitlist - doTxNotify{peer: "D", hashes: []common.Hash{{0x0b}}, types: []byte{types.BlobTxType}, sizes: []uint32{1000}}, - isWaiting(map[string][]announce{ - "A": { - {common.Hash{0x01}, types.LegacyTxType, 111}, - {common.Hash{0x02}, types.LegacyTxType, 222}, - {common.Hash{0x03}, types.LegacyTxType, 333}, - {common.Hash{0x05}, types.LegacyTxType, 555}, - }, - "B": { - {common.Hash{0x03}, types.LegacyTxType, 333}, - {common.Hash{0x04}, types.LegacyTxType, 444}, - }, - "C": { - {common.Hash{0x01}, types.LegacyTxType, 111}, - {common.Hash{0x04}, types.LegacyTxType, 444}, - }, - "D": { - {common.Hash{0x01}, types.LegacyTxType, 999}, - {common.Hash{0x02}, types.BlobTxType, 222}, - }, - }), - isScheduled{ - tracking: map[string][]announce{ - "D": {{common.Hash{0x0B}, types.BlobTxType, 1000}}, - }, - fetching: map[string][]common.Hash{ - "D": {{0x0B}}, - }, - }, // Wait for the arrival timeout which should move all expired items // from the wait list to the scheduler @@ -234,21 +203,19 @@ func TestTransactionFetcherWaiting(t *testing.T) { "D": { {common.Hash{0x01}, types.LegacyTxType, 999}, {common.Hash{0x02}, types.BlobTxType, 222}, - {common.Hash{0x0B}, types.BlobTxType, 1000}, }, }, fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer - "A": {{0x02}, {0x05}}, - "B": {{0x03}, {0x04}}, - "C": {{0x01}}, - "D": {{0x0B}}, + "A": {{0x03}, {0x05}}, + "C": {{0x01}, {0x04}}, + "D": {{0x02}}, }, }, // Queue up a non-fetchable transaction and then trigger it with a new // peer (weird case to test 1 line in the fetcher) - doTxNotify{peer: "B", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}}, + doTxNotify{peer: "C", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}}, isWaiting(map[string][]announce{ - "B": { + "C": { {common.Hash{0x06}, types.LegacyTxType, 666}, {common.Hash{0x07}, types.LegacyTxType, 777}, }, @@ -265,24 +232,22 @@ func TestTransactionFetcherWaiting(t *testing.T) { "B": { {common.Hash{0x03}, types.LegacyTxType, 333}, {common.Hash{0x04}, types.LegacyTxType, 444}, - {common.Hash{0x06}, types.LegacyTxType, 666}, - {common.Hash{0x07}, types.LegacyTxType, 777}, }, "C": { {common.Hash{0x01}, types.LegacyTxType, 111}, {common.Hash{0x04}, types.LegacyTxType, 444}, + {common.Hash{0x06}, types.LegacyTxType, 666}, + {common.Hash{0x07}, types.LegacyTxType, 777}, }, "D": { {common.Hash{0x01}, types.LegacyTxType, 999}, {common.Hash{0x02}, types.BlobTxType, 222}, - {common.Hash{0x0B}, types.BlobTxType, 1000}, }, }, fetching: map[string][]common.Hash{ - "A": {{0x02}, {0x05}}, - "B": {{0x03}, {0x04}}, - "C": {{0x01}}, - "D": {{0x0B}}, + "A": {{0x03}, {0x05}}, + "C": {{0x01}, {0x04}}, + "D": {{0x02}}, }, }, doTxNotify{peer: "E", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}}, @@ -297,17 +262,16 @@ func TestTransactionFetcherWaiting(t *testing.T) { "B": { {common.Hash{0x03}, types.LegacyTxType, 333}, {common.Hash{0x04}, types.LegacyTxType, 444}, - {common.Hash{0x06}, types.LegacyTxType, 666}, - {common.Hash{0x07}, types.LegacyTxType, 777}, }, "C": { {common.Hash{0x01}, types.LegacyTxType, 111}, {common.Hash{0x04}, types.LegacyTxType, 444}, + {common.Hash{0x06}, types.LegacyTxType, 666}, + {common.Hash{0x07}, types.LegacyTxType, 777}, }, "D": { {common.Hash{0x01}, types.LegacyTxType, 999}, {common.Hash{0x02}, types.BlobTxType, 222}, - {common.Hash{0x0B}, types.BlobTxType, 1000}, }, "E": { {common.Hash{0x06}, types.LegacyTxType, 666}, @@ -315,10 +279,9 @@ func TestTransactionFetcherWaiting(t *testing.T) { }, }, fetching: map[string][]common.Hash{ - "A": {{0x02}, {0x05}}, - "B": {{0x03}, {0x04}}, - "C": {{0x01}}, - "D": {{0x0B}}, + "A": {{0x03}, {0x05}}, + "C": {{0x01}, {0x04}}, + "D": {{0x02}}, "E": {{0x06}, {0x07}}, }, },