Skip to content

Commit

Permalink
eth/fetcher: polish the code
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 committed Aug 13, 2024
1 parent 81798d8 commit 7c6747f
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 100 deletions.
117 changes: 69 additions & 48 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math"
mrand "math/rand"
"sort"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -117,13 +118,17 @@ type txAnnounce struct {
}

// txMetadata provides the extra data transmitted along with the announcement
// for better fetch scheduling ('kind' & 'size'), plus an extra field
// ('arrival') to keep track of its order of arrival. 'size==0' can be used to
// test for 0 pre-eth/68 announcements. In this case, kind will also be 0.
// for better fetch scheduling.
type txMetadata struct {
kind byte // Transaction consensus type
size uint32 // Transaction size in bytes, or 0 if the announcement didn't include metadata
arrival uint64 // Value that can be used to sort announcements by order of arrival
kind byte // Transaction consensus type
size uint32 // Transaction size in bytes
}

// txMetadataWithSeq is a wrapper of transaction metadata with an extra field
// tracking the transaction sequence number.
type txMetadataWithSeq struct {
txMetadata
seq uint64
}

// txRequest represents an in-flight transaction retrieval request destined to
Expand Down Expand Up @@ -171,20 +176,19 @@ type TxFetcher struct {
drop chan *txDrop
quit chan struct{}

txSeq atomic.Uint64 // Unique transaction sequence number
underpriced *lru.Cache[common.Hash, time.Time] // Transactions discarded as too cheap (don't re-fetch)

counter uint64 // counter used to assign arrival order to tx announcements

// Stage 1: Waiting lists for newly discovered transactions that might be
// broadcast without needing explicit request/reply round trips.
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadata // Waiting announcements grouped by peer (DoS protection)
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadataWithSeq // Waiting announcements grouped by peer (DoS protection)

// Stage 2: Queue of transactions that waiting to be allocated to some peer
// to be retrieved directly.
announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
announces map[string]map[common.Hash]*txMetadataWithSeq // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash

// Stage 3: Set of transactions currently being retrieved, some which may be
// fulfilled and some rescheduled. Note, this step shares 'announces' from the
Expand Down Expand Up @@ -222,8 +226,8 @@ func NewTxFetcherForTests(
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]*txMetadata),
announces: make(map[string]map[common.Hash]*txMetadata),
waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq),
announces: make(map[string]map[common.Hash]*txMetadataWithSeq),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
Expand Down Expand Up @@ -449,9 +453,17 @@ func (f *TxFetcher) loop() {

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = &ann.metas[i]
announces[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: f.txSeq.Add(1),
}
} else {
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]}
f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: f.txSeq.Add(1),
},
}
}
continue
}
Expand All @@ -462,26 +474,18 @@ func (f *TxFetcher) loop() {

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = &ann.metas[i]
} else {
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]}
}
continue
}
// If this is a blob tx, schedule it to fetch without being
// waitlisted since blob txs should not be broadcast. If its
// hash is already on the waitlist, it was previously announced
// as a non-blob (or unknown) tx type. In this case we'll just
// eat the delay and continue handling it as a waitlisted tx to
// keep things simple.
if ann.metas[i].kind == types.BlobTxType && f.waitlist[hash] == nil {
f.announced[hash] = map[string]struct{}{ann.origin: {}}
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = &ann.metas[i]
announces[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: f.txSeq.Add(1),
}
} else {
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]}
f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: f.txSeq.Add(1),
},
}
}
f.scheduleFetches(timeoutTimer, timeoutTrigger, map[string]struct{}{ann.origin: {}})
continue
}
// If the transaction is already known to the fetcher, but not
Expand All @@ -497,9 +501,17 @@ func (f *TxFetcher) loop() {
f.waitlist[hash][ann.origin] = struct{}{}

if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = &ann.metas[i]
waitslots[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: f.txSeq.Add(1),
}
} else {
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]}
f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: f.txSeq.Add(1),
},
}
}
continue
}
Expand All @@ -508,9 +520,17 @@ func (f *TxFetcher) loop() {
f.waittime[hash] = f.clock.Now()

if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = &ann.metas[i]
waitslots[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: f.txSeq.Add(1),
}
} else {
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]}
f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: f.txSeq.Add(1),
},
}
}
}
// If a new item was added to the waitlist, schedule it into the fetcher
Expand Down Expand Up @@ -538,7 +558,7 @@ func (f *TxFetcher) loop() {
if announces := f.announces[peer]; announces != nil {
announces[hash] = f.waitslots[peer][hash]
} else {
f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]}
f.announces[peer] = map[common.Hash]*txMetadataWithSeq{hash: f.waitslots[peer][hash]}
}
delete(f.waitslots[peer], hash)
if len(f.waitslots[peer]) == 0 {
Expand Down Expand Up @@ -612,7 +632,7 @@ func (f *TxFetcher) loop() {
for i, hash := range delivery.hashes {
if _, ok := f.waitlist[hash]; ok {
for peer, txset := range f.waitslots {
if meta, ok := txset[hash]; ok && meta.size != 0 {
if meta := txset[hash]; meta != nil {
if delivery.metas[i].kind != meta.kind {
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
f.dropPeer(peer)
Expand All @@ -638,7 +658,7 @@ func (f *TxFetcher) loop() {
delete(f.waittime, hash)
} else {
for peer, txset := range f.announces {
if meta, ok := txset[hash]; ok && meta.size != 0 {
if meta := txset[hash]; meta != nil {
if delivery.metas[i].kind != meta.kind {
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
f.dropPeer(peer)
Expand Down Expand Up @@ -962,20 +982,21 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))

// forEachAnnounce loops over the given announcements in arrival order, invoking
// the do function for each until it returns false. We enforce an arrival
// ordering to minimize the chances of mempool nonce-gaps, which result in blob
// transactions being rejected by the mempool.
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta txMetadata) bool) {
// ordering to minimize the chances of transaction nonce-gaps, which result in
// transactions being rejected by the txpool.
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadataWithSeq, do func(hash common.Hash, meta txMetadata) bool) {
type announcement struct {
hash common.Hash
meta txMetadata
seq uint64
}
// process announcements by their arrival order
list := make([]announcement, 0, len(announces))
for hash, metadata := range announces {
list = append(list, announcement{hash: hash, meta: *metadata})
for hash, entry := range announces {
list = append(list, announcement{hash: hash, meta: entry.txMetadata, seq: entry.seq})
}
sort.Slice(list, func(i, j int) bool {
return list[i].meta.arrival < list[j].meta.arrival
return list[i].seq < list[j].seq
})
for i := range list {
if !do(list[i].hash, list[i].meta) {
Expand Down
67 changes: 15 additions & 52 deletions eth/fetcher/tx_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,37 +179,6 @@ func TestTransactionFetcherWaiting(t *testing.T) {
},
}),
isScheduled{tracking: nil, fetching: nil},
// Announce a non-conflicting blob tx, which should immediately go
// to fetching without hitting the waitlist
doTxNotify{peer: "D", hashes: []common.Hash{{0x0b}}, types: []byte{types.BlobTxType}, sizes: []uint32{1000}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x05}, types.LegacyTxType, 555},
},
"B": {
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
"C": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
"D": {
{common.Hash{0x01}, types.LegacyTxType, 999},
{common.Hash{0x02}, types.BlobTxType, 222},
},
}),
isScheduled{
tracking: map[string][]announce{
"D": {{common.Hash{0x0B}, types.BlobTxType, 1000}},
},
fetching: map[string][]common.Hash{
"D": {{0x0B}},
},
},

// Wait for the arrival timeout which should move all expired items
// from the wait list to the scheduler
Expand All @@ -234,21 +203,19 @@ func TestTransactionFetcherWaiting(t *testing.T) {
"D": {
{common.Hash{0x01}, types.LegacyTxType, 999},
{common.Hash{0x02}, types.BlobTxType, 222},
{common.Hash{0x0B}, types.BlobTxType, 1000},
},
},
fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer
"A": {{0x02}, {0x05}},
"B": {{0x03}, {0x04}},
"C": {{0x01}},
"D": {{0x0B}},
"A": {{0x03}, {0x05}},
"C": {{0x01}, {0x04}},
"D": {{0x02}},
},
},
// Queue up a non-fetchable transaction and then trigger it with a new
// peer (weird case to test 1 line in the fetcher)
doTxNotify{peer: "B", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}},
doTxNotify{peer: "C", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}},
isWaiting(map[string][]announce{
"B": {
"C": {
{common.Hash{0x06}, types.LegacyTxType, 666},
{common.Hash{0x07}, types.LegacyTxType, 777},
},
Expand All @@ -265,24 +232,22 @@ func TestTransactionFetcherWaiting(t *testing.T) {
"B": {
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x04}, types.LegacyTxType, 444},
{common.Hash{0x06}, types.LegacyTxType, 666},
{common.Hash{0x07}, types.LegacyTxType, 777},
},
"C": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x04}, types.LegacyTxType, 444},
{common.Hash{0x06}, types.LegacyTxType, 666},
{common.Hash{0x07}, types.LegacyTxType, 777},
},
"D": {
{common.Hash{0x01}, types.LegacyTxType, 999},
{common.Hash{0x02}, types.BlobTxType, 222},
{common.Hash{0x0B}, types.BlobTxType, 1000},
},
},
fetching: map[string][]common.Hash{
"A": {{0x02}, {0x05}},
"B": {{0x03}, {0x04}},
"C": {{0x01}},
"D": {{0x0B}},
"A": {{0x03}, {0x05}},
"C": {{0x01}, {0x04}},
"D": {{0x02}},
},
},
doTxNotify{peer: "E", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}},
Expand All @@ -297,28 +262,26 @@ func TestTransactionFetcherWaiting(t *testing.T) {
"B": {
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x04}, types.LegacyTxType, 444},
{common.Hash{0x06}, types.LegacyTxType, 666},
{common.Hash{0x07}, types.LegacyTxType, 777},
},
"C": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x04}, types.LegacyTxType, 444},
{common.Hash{0x06}, types.LegacyTxType, 666},
{common.Hash{0x07}, types.LegacyTxType, 777},
},
"D": {
{common.Hash{0x01}, types.LegacyTxType, 999},
{common.Hash{0x02}, types.BlobTxType, 222},
{common.Hash{0x0B}, types.BlobTxType, 1000},
},
"E": {
{common.Hash{0x06}, types.LegacyTxType, 666},
{common.Hash{0x07}, types.LegacyTxType, 777},
},
},
fetching: map[string][]common.Hash{
"A": {{0x02}, {0x05}},
"B": {{0x03}, {0x04}},
"C": {{0x01}},
"D": {{0x0B}},
"A": {{0x03}, {0x05}},
"C": {{0x01}, {0x04}},
"D": {{0x02}},
"E": {{0x06}, {0x07}},
},
},
Expand Down

0 comments on commit 7c6747f

Please sign in to comment.