Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify tx response data against request #6439

Merged
merged 48 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
d1f8c59
Verify tx response data against request
emhane Feb 6, 2024
31d2a5e
Validate PooledTransactions response and verify against GetPooledTran…
emhane Feb 17, 2024
5c63562
Fix lint and docs
emhane Feb 18, 2024
499a5a1
Trace txns in response that fail verification
emhane Feb 18, 2024
8068e18
Fix lint, docs and tests in validation
emhane Feb 18, 2024
b43916a
Make log message futurue proof when adding more verifications on eth6…
emhane Feb 18, 2024
63db362
Update type name
emhane Feb 18, 2024
cce2ba2
fixup! Update type name
emhane Feb 18, 2024
9cf569c
Fix typo docs
emhane Feb 18, 2024
7903ae2
Fix docs
emhane Feb 18, 2024
f4efd70
Merge branch 'emhane/validate-tx-response' of github.com:paradigmxyz/…
emhane Feb 18, 2024
6acb930
Simplification
emhane Feb 18, 2024
e1c35b2
Advance inflight requests on-op
emhane Feb 14, 2024
ff6bb4a
Advance inflight requests when no requests are being queued
emhane Feb 16, 2024
7ad212d
Only advance inflight requests if no fetch events ready
emhane Feb 16, 2024
693dbca
Clean up and shrink scope
emhane Feb 16, 2024
502f19b
Fix conflicts cherry-picking off emhane/prioritisation-network-manager
emhane Feb 18, 2024
185723b
Fix docs by drive-by opening visibility to make tx fetcher extensible
emhane Feb 18, 2024
e265894
Drive-by, fix lint
emhane Feb 18, 2024
92f5374
Make docs more verbose
emhane Feb 18, 2024
235796b
Fix lint fix bug
emhane Feb 18, 2024
7fc3bf9
Pass context as param to cover edge case no boradcast activity
emhane Feb 18, 2024
5cc593e
Always advance inflight requests on poll
emhane Feb 18, 2024
df673a4
Try reduce merge conflicts
emhane Feb 22, 2024
13df3f1
Merge branch 'main' into emhane/one-req-per-peer-bottleneck
emhane Feb 22, 2024
fc5df9d
Fix lint
emhane Feb 22, 2024
e794ccb
Fix merge conflicts with main
emhane Feb 22, 2024
fd52b6c
Fix docs
emhane Feb 22, 2024
84eb44d
Merge branch 'main' into emhane/one-req-per-peer-bottleneck
emhane Feb 22, 2024
72a8ef5
Fix lint
emhane Feb 22, 2024
9feb4b1
Fix merge conflicts with emhane/one-req-per-peer-bottleneck
emhane Feb 22, 2024
963012b
Fix missing docs
emhane Feb 22, 2024
9065482
Fix merge conflicts
emhane Feb 22, 2024
358f3ac
Fix merge conflicts with emhane/one-req-per-peer-bottleneck
emhane Feb 22, 2024
2aadec6
Fix merge conflicts
emhane Feb 22, 2024
842c78d
Fix merge conflicts
emhane Feb 22, 2024
529c1d0
Merge branch 'emhane/one-req-per-peer-bottleneck' into emhane/validat…
emhane Feb 22, 2024
ddf0176
Fix lint
emhane Feb 22, 2024
9de1fb7
Fix lint
emhane Feb 22, 2024
a601871
Fix lint
emhane Feb 22, 2024
b4d4a02
Revert signature of retain_by_hash and retain_unknown
emhane Feb 26, 2024
74103ea
Update docs
emhane Feb 26, 2024
3eef7a1
Merge branch 'main' into emhane/validate-tx-response
emhane Feb 26, 2024
c0d6352
Fix conflicts of changing base to main
emhane Feb 27, 2024
f01b94b
Fix whitespace
emhane Feb 27, 2024
a0d253a
Nitpick
emhane Feb 27, 2024
ad77de2
Add issue link to comment
emhane Feb 27, 2024
aab3a03
Merge branch 'emhane/validate-tx-response' of github.com:paradigmxyz/…
emhane Feb 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

280 changes: 148 additions & 132 deletions crates/net/eth-wire/src/types/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use alloy_rlp::{
Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper,
};

use derive_more::{Constructor, Deref, DerefMut, IntoIterator};
use derive_more::{Constructor, Deref, DerefMut, From, IntoIterator};
use reth_codecs::derive_arbitrary;
use reth_primitives::{
Block, Bytes, PooledTransactionsElement, TransactionSigned, TxHash, B256, U128,
Expand Down Expand Up @@ -441,28 +441,27 @@ impl Decodable for NewPooledTransactionHashes68 {
}
}

/// Interface for handling mempool message data. Used in various filters in pipelines in
/// `TransactionsManager` and in queries to `TransactionPool`.
pub trait HandleMempoolData {
/// The announcement contains no entries.
/// Validation pass that checks for unique transaction hashes.
pub trait DedupPayload {
/// Value type in [`PartiallyValidData`] map.
type Value;

/// The payload contains no entries.
fn is_empty(&self) -> bool;

/// Returns the number of entries.
fn len(&self) -> usize;

/// Retain only entries for which the hash in the entry satisfies a given predicate, return
/// the rest.
fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool) -> Self;
/// Consumes self, returning an iterator over hashes in payload.
fn dedup(self) -> PartiallyValidData<Self::Value>;
}

/// Extension of [`HandleMempoolData`] interface, for mempool messages that are versioned.
pub trait HandleVersionedMempoolData {
/// Returns the announcement version, either [`Eth66`](EthVersion::Eth66) or
/// [`Eth68`](EthVersion::Eth68).
fn msg_version(&self) -> EthVersion;
}
/// Value in [`PartiallyValidData`] map obtained from an announcement.
pub type Eth68TxMetadata = Option<(u8, usize)>;

impl DedupPayload for NewPooledTransactionHashes {
type Value = Eth68TxMetadata;

impl HandleMempoolData for NewPooledTransactionHashes {
fn is_empty(&self) -> bool {
self.is_empty()
}
Expand All @@ -471,21 +470,17 @@ impl HandleMempoolData for NewPooledTransactionHashes {
self.len()
}

fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool) -> Self {
fn dedup(self) -> PartiallyValidData<Self::Value> {
match self {
NewPooledTransactionHashes::Eth66(msg) => Self::Eth66(msg.retain_by_hash(f)),
NewPooledTransactionHashes::Eth68(msg) => Self::Eth68(msg.retain_by_hash(f)),
NewPooledTransactionHashes::Eth66(msg) => msg.dedup(),
NewPooledTransactionHashes::Eth68(msg) => msg.dedup(),
}
}
}

impl HandleVersionedMempoolData for NewPooledTransactionHashes {
fn msg_version(&self) -> EthVersion {
self.version()
}
}
impl DedupPayload for NewPooledTransactionHashes68 {
type Value = Eth68TxMetadata;

impl HandleMempoolData for NewPooledTransactionHashes68 {
fn is_empty(&self) -> bool {
self.hashes.is_empty()
}
Expand All @@ -494,38 +489,24 @@ impl HandleMempoolData for NewPooledTransactionHashes68 {
self.hashes.len()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let mut indices_to_remove = vec![];
for (i, hash) in self.hashes.iter().enumerate() {
if !f(hash) {
indices_to_remove.push(i);
}
}
fn dedup(self) -> PartiallyValidData<Self::Value> {
let Self { hashes, mut sizes, mut types } = self;

let mut deduped_data = HashMap::with_capacity(hashes.len());

let mut removed_hashes = Vec::with_capacity(indices_to_remove.len());
let mut removed_types = Vec::with_capacity(indices_to_remove.len());
let mut removed_sizes = Vec::with_capacity(indices_to_remove.len());

for index in indices_to_remove.into_iter().rev() {
let hash = self.hashes.remove(index);
removed_hashes.push(hash);
let ty = self.types.remove(index);
removed_types.push(ty);
let size = self.sizes.remove(index);
removed_sizes.push(size);
for hash in hashes.into_iter().rev() {
if let (Some(ty), Some(size)) = (types.pop(), sizes.pop()) {
deduped_data.insert(hash, Some((ty, size)));
}
}

Self { hashes: removed_hashes, types: removed_types, sizes: removed_sizes }
PartiallyValidData::from_raw_data_eth68(deduped_data)
}
}

impl HandleVersionedMempoolData for NewPooledTransactionHashes68 {
fn msg_version(&self) -> EthVersion {
EthVersion::Eth68
}
}
impl DedupPayload for NewPooledTransactionHashes66 {
type Value = Eth68TxMetadata;

impl HandleMempoolData for NewPooledTransactionHashes66 {
fn is_empty(&self) -> bool {
self.0.is_empty()
}
Expand All @@ -534,100 +515,163 @@ impl HandleMempoolData for NewPooledTransactionHashes66 {
self.0.len()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let mut indices_to_remove = vec![];
for (i, hash) in self.0.iter().enumerate() {
if !f(hash) {
indices_to_remove.push(i);
}
}
fn dedup(self) -> PartiallyValidData<Self::Value> {
let Self(hashes) = self;

let mut removed_hashes = Vec::with_capacity(indices_to_remove.len());
let mut deduped_data = HashMap::with_capacity(hashes.len());

for index in indices_to_remove.into_iter().rev() {
let hash = self.0.remove(index);
removed_hashes.push(hash);
let noop_value: Eth68TxMetadata = None;

for hash in hashes.into_iter().rev() {
deduped_data.insert(hash, noop_value);
}

Self(removed_hashes)
PartiallyValidData::from_raw_data_eth66(deduped_data)
}
}

impl HandleVersionedMempoolData for NewPooledTransactionHashes66 {
fn msg_version(&self) -> EthVersion {
EthVersion::Eth66
/// Interface for handling mempool message data. Used in various filters in pipelines in
/// `TransactionsManager` and in queries to `TransactionPool`.
pub trait HandleMempoolData {
/// The announcement contains no entries.
fn is_empty(&self) -> bool;

/// Returns the number of entries.
fn len(&self) -> usize;

/// Retain only entries for which the hash in the entry satisfies a given predicate.
fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool);
}

/// Extension of [`HandleMempoolData`] interface, for mempool messages that are versioned.
pub trait HandleVersionedMempoolData {
/// Returns the announcement version, either [`Eth66`](EthVersion::Eth66) or
/// [`Eth68`](EthVersion::Eth68).
fn msg_version(&self) -> EthVersion;
}

impl HandleMempoolData for Vec<PooledTransactionsElement> {
fn is_empty(&self) -> bool {
self.is_empty()
}

fn len(&self) -> usize {
self.len()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
self.retain(|tx| f(tx.hash()))
}
}

/// Announcement data that has been validated according to the configured network. For an eth68
/// announcement, values of the map are `Some((u8, usize))` - the tx metadata. For an eth66
/// announcement, values of the map are `None`.
#[derive(Debug, Deref, DerefMut, IntoIterator, Constructor)]
pub struct ValidAnnouncementData {
macro_rules! handle_mempool_data_map_impl {
($data_ty:ty, $(<$generic:ident>)?) => {
impl$(<$generic>)? HandleMempoolData for $data_ty {
fn is_empty(&self) -> bool {
self.data.is_empty()
}

fn len(&self) -> usize {
self.data.len()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
self.data.retain(|hash, _| f(hash));
}
}
};
}

/// Data that has passed an initial validation pass that is not specific to any mempool message
/// type.
#[derive(Debug, Deref, DerefMut, IntoIterator)]
pub struct PartiallyValidData<V> {
#[deref]
#[deref_mut]
#[into_iterator]
data: HashMap<TxHash, Option<(u8, usize)>>,
version: EthVersion,
data: HashMap<TxHash, V>,
version: Option<EthVersion>,
emhane marked this conversation as resolved.
Show resolved Hide resolved
}

impl ValidAnnouncementData {
/// Returns a new [`ValidAnnouncementData`] wrapper around validated
/// [`Eth68`](EthVersion::Eth68) announcement data.
pub fn new_eth68(data: HashMap<TxHash, Option<(u8, usize)>>) -> Self {
Self::new(data, EthVersion::Eth68)
handle_mempool_data_map_impl!(PartiallyValidData<V>, <V>);

impl<V> PartiallyValidData<V> {
/// Wraps raw data.
pub fn from_raw_data(data: HashMap<TxHash, V>, version: Option<EthVersion>) -> Self {
Self { data, version }
}

/// Returns a new [`ValidAnnouncementData`] wrapper around validated
/// [`Eth68`](EthVersion::Eth68) announcement data.
pub fn new_eth66(data: HashMap<TxHash, Option<(u8, usize)>>) -> Self {
Self::new(data, EthVersion::Eth66)
/// Wraps raw data with version [`EthVersion::Eth68`].
pub fn from_raw_data_eth68(data: HashMap<TxHash, V>) -> Self {
Self::from_raw_data(data, Some(EthVersion::Eth68))
}

/// Returns a new [`ValidAnnouncementData`] with empty data from an [`Eth68`](EthVersion::Eth68)
/// Wraps raw data with version [`EthVersion::Eth66`].
pub fn from_raw_data_eth66(data: HashMap<TxHash, V>) -> Self {
Self::from_raw_data(data, Some(EthVersion::Eth66))
}

/// Returns a new [`PartiallyValidData`] with empty data from an [`Eth68`](EthVersion::Eth68)
/// announcement.
pub fn empty_eth68() -> Self {
Self::new_eth68(HashMap::new())
Self::from_raw_data_eth68(HashMap::new())
}

/// Returns a new [`ValidAnnouncementData`] with empty data from an [`Eth66`](EthVersion::Eth66)
/// Returns a new [`PartiallyValidData`] with empty data from an [`Eth66`](EthVersion::Eth66)
/// announcement.
pub fn empty_eth66() -> Self {
Self::new_eth66(HashMap::new())
Self::from_raw_data_eth66(HashMap::new())
}

/// Returns the version of the message this data was received in if different versions of the
/// message exists, either [`Eth66`](EthVersion::Eth66) or [`Eth68`](EthVersion::Eth68).
pub fn msg_version(&self) -> Option<EthVersion> {
self.version
}

/// Destructs returning the validated data.
pub fn into_data(self) -> HashMap<TxHash, Option<(u8, usize)>> {
pub fn into_data(self) -> HashMap<TxHash, V> {
self.data
}
}

/// Partially validated data from an announcement or a
/// [`PooledTransactions`](crate::PooledTransactions) response.
#[derive(Debug, Deref, DerefMut, IntoIterator, From)]
#[from(PartiallyValidData<Eth68TxMetadata>)]
pub struct ValidAnnouncementData {
#[deref]
#[deref_mut]
#[into_iterator]
data: HashMap<TxHash, Eth68TxMetadata>,
version: EthVersion,
}

handle_mempool_data_map_impl!(ValidAnnouncementData,);

impl ValidAnnouncementData {
/// Destructs returning only the valid hashes and the announcement message version. Caution! If
/// this is [`Eth68`](EthVersion::Eth68)announcement data, the metadata must be cached
/// before call.
/// this is [`Eth68`](EthVersion::Eth68) announcement data, this drops the metadata.
pub fn into_request_hashes(self) -> (RequestTxHashes, EthVersion) {
let hashes = self.data.into_keys().collect::<Vec<_>>();

(RequestTxHashes::new(hashes), self.version)
}
}

impl HandleMempoolData for ValidAnnouncementData {
fn is_empty(&self) -> bool {
self.data.is_empty()
}

fn len(&self) -> usize {
self.data.len()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let data = std::mem::take(&mut self.data);
/// Conversion from [`PartiallyValidData`] from an announcement. Note! [`PartiallyValidData`]
/// from an announcement, should have some [`EthVersion`]. Panics if [`PartiallyValidData`] has
/// version set to `None`.
mattsse marked this conversation as resolved.
Show resolved Hide resolved
pub fn from_partially_valid_data(data: PartiallyValidData<Eth68TxMetadata>) -> Self {
let PartiallyValidData { data, version } = data;

let (keep, rest) = data.into_iter().partition(|(hash, _)| f(hash));
let version = version.expect("should have eth version for conversion");

self.data = keep;
Self { data, version }
}

ValidAnnouncementData::new(rest, self.version)
/// Destructs returning the validated data.
pub fn into_data(self) -> HashMap<TxHash, Eth68TxMetadata> {
self.data
}
}

Expand Down Expand Up @@ -656,8 +700,8 @@ impl RequestTxHashes {
}
}

impl FromIterator<(TxHash, Option<(u8, usize)>)> for RequestTxHashes {
fn from_iter<I: IntoIterator<Item = (TxHash, Option<(u8, usize)>)>>(iter: I) -> Self {
impl FromIterator<(TxHash, Eth68TxMetadata)> for RequestTxHashes {
fn from_iter<I: IntoIterator<Item = (TxHash, Eth68TxMetadata)>>(iter: I) -> Self {
let mut hashes = Vec::with_capacity(32);

for (hash, _) in iter {
Expand All @@ -670,34 +714,6 @@ impl FromIterator<(TxHash, Option<(u8, usize)>)> for RequestTxHashes {
}
}

impl HandleMempoolData for Vec<PooledTransactionsElement> {
fn is_empty(&self) -> bool {
self.is_empty()
}

fn len(&self) -> usize {
self.len()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let mut indices_to_remove = vec![];
for (i, tx) in self.iter().enumerate() {
if !f(tx.hash()) {
indices_to_remove.push(i);
}
}

let mut removed_txns = Vec::with_capacity(indices_to_remove.len());

for index in indices_to_remove.into_iter().rev() {
let hash = self.remove(index);
removed_txns.push(hash);
}

removed_txns
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading
Loading