Skip to content

Commit

Permalink
address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
roberto-bayardo authored and rjl493456442 committed Aug 13, 2024
1 parent 41c7cc6 commit 81798d8
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 51 deletions.
2 changes: 1 addition & 1 deletion cmd/devp2p/internal/ethtest/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ func (s *Suite) TestBlobViolations(t *utesting.T) {
if code, _, err := conn.Read(); err != nil {
t.Fatalf("expected disconnect on blob violation, got err: %v", err)
} else if code != discMsg {
if code == 24 {
if code == protoOffset(ethProto)+eth.NewPooledTransactionHashesMsg {
// sometimes we'll get a blob transaction hashes announcement before the disconnect
// because blob transactions are scheduled to be fetched right away.
if code, _, err = conn.Read(); err != nil {
Expand Down
72 changes: 22 additions & 50 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ var errTerminated = errors.New("terminated")
type txAnnounce struct {
origin string // Identifier of the peer originating the notification
hashes []common.Hash // Batch of transaction hashes being announced
<<<<<<< HEAD
metas []*txMetadata // Batch of metadata associated with the hashes
=======
metas []txMetadata // Batch of metadatas associated with the hashes
>>>>>>> ddf2fd4314 (- fetch transactions from a peer in the order they were announced to minimize nonce-gaps (which cause blob txs to be rejected))
metas []txMetadata // Batch of metadata associated with the hashes
}

// txMetadata provides the extra data transmitted along with the announcement
Expand Down Expand Up @@ -181,14 +177,14 @@ type TxFetcher struct {

// Stage 1: Waiting lists for newly discovered transactions that might be
// broadcast without needing explicit request/reply round trips.
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]txMetadata // Waiting announcements grouped by peer (DoS protection)
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadata // Waiting announcements grouped by peer (DoS protection)

// Stage 2: Queue of transactions that waiting to be allocated to some peer
// to be retrieved directly.
announces map[string]map[common.Hash]txMetadata // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash

// Stage 3: Set of transactions currently being retrieved, some which may be
// fulfilled and some rescheduled. Note, this step shares 'announces' from the
Expand Down Expand Up @@ -226,8 +222,8 @@ func NewTxFetcherForTests(
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]txMetadata),
announces: make(map[string]map[common.Hash]txMetadata),
waitslots: make(map[string]map[common.Hash]*txMetadata),
announces: make(map[string]map[common.Hash]*txMetadata),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
Expand Down Expand Up @@ -274,20 +270,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
underpriced++
default:
unknownHashes = append(unknownHashes, hash)
<<<<<<< HEAD
unknownMetas = append(unknownMetas, &txMetadata{kind: types[i], size: sizes[i]})
=======
if types == nil {
unknownMetas = append(unknownMetas, txMetadata{arrival: f.counter})
} else {
if sizes[i] == 0 {
// invalid size parameter, return error
return fmt.Errorf("announcement from tx %x had an invalid 0 size metadata", hash)
}
unknownMetas = append(unknownMetas, txMetadata{kind: types[i], size: sizes[i], arrival: f.counter})
}
f.counter++
>>>>>>> ddf2fd4314 (- fetch transactions from a peer in the order they were announced to minimize nonce-gaps (which cause blob txs to be rejected))
unknownMetas = append(unknownMetas, txMetadata{kind: types[i], size: sizes[i]})
}
}
txAnnounceKnownMeter.Mark(duplicate)
Expand All @@ -298,7 +281,6 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
return nil
}
announce := &txAnnounce{origin: peer, hashes: unknownHashes, metas: unknownMetas}
f.counter++
select {
case f.notify <- announce:
return nil
Expand Down Expand Up @@ -467,9 +449,9 @@ func (f *TxFetcher) loop() {

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = ann.metas[i]
announces[hash] = &ann.metas[i]
} else {
f.announces[ann.origin] = map[common.Hash]txMetadata{hash: ann.metas[i]}
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]}
}
continue
}
Expand All @@ -480,9 +462,9 @@ func (f *TxFetcher) loop() {

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = ann.metas[i]
announces[hash] = &ann.metas[i]
} else {
f.announces[ann.origin] = map[common.Hash]txMetadata{hash: ann.metas[i]}
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]}
}
continue
}
Expand All @@ -495,9 +477,9 @@ func (f *TxFetcher) loop() {
if ann.metas[i].kind == types.BlobTxType && f.waitlist[hash] == nil {
f.announced[hash] = map[string]struct{}{ann.origin: {}}
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = ann.metas[i]
announces[hash] = &ann.metas[i]
} else {
f.announces[ann.origin] = map[common.Hash]txMetadata{hash: ann.metas[i]}
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]}
}
f.scheduleFetches(timeoutTimer, timeoutTrigger, map[string]struct{}{ann.origin: {}})
continue
Expand All @@ -515,9 +497,9 @@ func (f *TxFetcher) loop() {
f.waitlist[hash][ann.origin] = struct{}{}

if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = ann.metas[i]
waitslots[hash] = &ann.metas[i]
} else {
f.waitslots[ann.origin] = map[common.Hash]txMetadata{hash: ann.metas[i]}
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]}
}
continue
}
Expand All @@ -526,9 +508,9 @@ func (f *TxFetcher) loop() {
f.waittime[hash] = f.clock.Now()

if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = ann.metas[i]
waitslots[hash] = &ann.metas[i]
} else {
f.waitslots[ann.origin] = map[common.Hash]txMetadata{hash: ann.metas[i]}
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]}
}
}
// If a new item was added to the waitlist, schedule it into the fetcher
Expand Down Expand Up @@ -556,7 +538,7 @@ func (f *TxFetcher) loop() {
if announces := f.announces[peer]; announces != nil {
announces[hash] = f.waitslots[peer][hash]
} else {
f.announces[peer] = map[common.Hash]txMetadata{hash: f.waitslots[peer][hash]}
f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]}
}
delete(f.waitslots[peer], hash)
if len(f.waitslots[peer]) == 0 {
Expand Down Expand Up @@ -932,18 +914,8 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
if len(hashes) >= maxTxRetrievals {
return false // break in the for-each
}
<<<<<<< HEAD
bytes += uint64(meta.size)
return bytes < maxTxRetrievalSize
=======
if meta.size != 0 { // Only set eth/68 and upwards
bytes += uint64(meta.size)
if bytes >= maxTxRetrievalSize {
return false
}
}
return true // scheduled, try to add more
>>>>>>> ddf2fd4314 (- fetch transactions from a peer in the order they were announced to minimize nonce-gaps (which cause blob txs to be rejected))
})
// If any hashes were allocated, request them from the peer
if len(hashes) > 0 {
Expand Down Expand Up @@ -992,15 +964,15 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))
// the do function for each until it returns false. We enforce an arrival
// ordering to minimize the chances of mempool nonce-gaps, which result in blob
// transactions being rejected by the mempool.
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]txMetadata, do func(hash common.Hash, meta txMetadata) bool) {
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta txMetadata) bool) {
type announcement struct {
hash common.Hash
meta txMetadata
}
// process announcements by their arrival order
list := make([]announcement, 0, len(announces))
for hash, metadata := range announces {
list = append(list, announcement{hash: hash, meta: metadata})
list = append(list, announcement{hash: hash, meta: *metadata})
}
sort.Slice(list, func(i, j int) bool {
return list[i].meta.arrival < list[j].meta.arrival
Expand Down

0 comments on commit 81798d8

Please sign in to comment.