From 77683f9cef7eaff41a25c3029e174fd936f65efc Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Wed, 17 Jan 2024 11:02:29 +0100 Subject: [PATCH] wip --- Cargo.lock | 1 + crates/p2p/Cargo.toml | 1 + crates/p2p/src/client/peer_agnostic.rs | 53 ++++++- crates/p2p/src/client/peer_agnostic/parse.rs | 137 ++++++++++++++----- crates/p2p/src/client/types.rs | 2 +- 5 files changed, 155 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14f2cc123b..b18850e000 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5742,6 +5742,7 @@ dependencies = [ "pathfinder-crypto", "prost 0.12.1", "rand", + "rayon", "rstest", "serde", "serde_json", diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml index 9a57a852c9..755b1eb382 100644 --- a/crates/p2p/Cargo.toml +++ b/crates/p2p/Cargo.toml @@ -37,6 +37,7 @@ pathfinder-common = { path = "../common" } pathfinder-crypto = { path = "../crypto" } prost = "0.12.1" rand = { workspace = true } +rayon = "1.8.0" serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } sha2 = "0.10.7" diff --git a/crates/p2p/src/client/peer_agnostic.rs b/crates/p2p/src/client/peer_agnostic.rs index bd95d1470a..1ffe5d02ea 100644 --- a/crates/p2p/src/client/peer_agnostic.rs +++ b/crates/p2p/src/client/peer_agnostic.rs @@ -14,8 +14,10 @@ use p2p_proto::event::EventsRequest; use p2p_proto::receipt::{Receipt, ReceiptsRequest}; use p2p_proto::transaction::TransactionsRequest; use pathfinder_common::{ - event::Event, transaction::TransactionVariant, BlockHash, BlockNumber, TransactionHash, + event::Event, transaction::TransactionVariant, BlockHash, BlockNumber, ContractAddress, + TransactionHash, }; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use tokio::sync::RwLock; use crate::sync::protocol; @@ -237,6 +239,55 @@ impl Client { match response_receiver { Ok(rx) => { if let Some(parsed) = parse::(&peer, rx).await { + let deploy_account = parsed.deploy_account; + let mut parsed = parsed.other; + + let (tx, rx) = tokio::sync::oneshot::channel(); + + rayon::spawn(move || { + // Now we can compute the missing addresses + let computed: Vec<_> = deploy_account + .into_par_iter() + .map(|(block_hash, transactions)| { + ( + block_hash, + transactions + .into_par_iter() + .map(|t| match t { + TransactionVariant::DeployAccountV0V1(mut x) => { + let contract_address = + ContractAddress::deployed_contract_address( + x.constructor_calldata.iter().copied(), + &x.contract_address_salt, + &x.class_hash, + ); + x.contract_address = contract_address; + TransactionVariant::DeployAccountV0V1(x) + } + TransactionVariant::DeployAccountV3(mut x) => { + let contract_address = + ContractAddress::deployed_contract_address( + x.constructor_calldata.iter().copied(), + &x.contract_address_salt, + &x.class_hash, + ); + x.contract_address = contract_address; + TransactionVariant::DeployAccountV3(x) + } + _ => unreachable!(), + }) + .collect::>(), + ) + }) + .collect(); + + // Send the result back to Tokio. + let _ = tx.send(computed); + }); + + let computed = rx.await?; + parsed.extend(computed); + return Ok(parsed); } } diff --git a/crates/p2p/src/client/peer_agnostic/parse.rs b/crates/p2p/src/client/peer_agnostic/parse.rs index 3eb88ad3a1..2ba987c85b 100644 --- a/crates/p2p/src/client/peer_agnostic/parse.rs +++ b/crates/p2p/src/client/peer_agnostic/parse.rs @@ -296,24 +296,85 @@ pub(crate) mod transactions { Uninitialized, Transactions { last_id: BlockId, - transactions: HashMap>, + transactions: InnerTransactions, }, Delimited { - transactions: HashMap>, + transactions: InnerTransactions, }, DelimitedWithError { error: Error, - transactions: HashMap>, + transactions: InnerTransactions, }, Empty { error: Option, }, } + #[derive(Debug)] + pub struct InnerTransactions { + pub deploy_account: HashMap>, + pub other: HashMap>, + } + + impl InnerTransactions { + pub fn is_empty(&self) -> bool { + self.deploy_account.is_empty() && self.other.is_empty() + } + + pub fn contains_key(&self, id: &BlockId) -> bool { + self.deploy_account.contains_key(id) || self.other.contains_key(id) + } + } + + #[derive(Debug)] + pub struct ParsedTransactions { + pub deploy_account: HashMap>, + pub other: HashMap>, + } + + impl From for ParsedTransactions { + fn from(inner: InnerTransactions) -> Self { + Self { + deploy_account: inner + .deploy_account + .into_iter() + .map(|(k, v)| (BlockHash(k.hash.0), v)) + .collect(), + other: inner + .other + .into_iter() + .map(|(k, v)| (BlockHash(k.hash.0), v)) + .collect(), + } + } + } + + fn try_from_dto( + dtos: Vec, + ) -> anyhow::Result<(Vec, Vec)> { + let mut deploy_account_txns = Vec::new(); + let mut other_txns = Vec::new(); + + for dto in dtos { + let transaction = + TransactionVariant::try_from_dto(dto).context("parsing transaction")?; + if matches!( + transaction, + TransactionVariant::DeployAccountV0V1(_) | TransactionVariant::DeployAccountV3(_) + ) { + deploy_account_txns.push(transaction); + } else { + other_txns.push(transaction); + } + } + + Ok((deploy_account_txns, other_txns)) + } + impl super::ParserState for State { type Dto = TransactionsResponse; - type Inner = HashMap>; - type Out = HashMap>; + type Inner = InnerTransactions; + type Out = ParsedTransactions; fn transition(self, next: Self::Dto) -> anyhow::Result { let TransactionsResponse { id, kind } = next; @@ -323,18 +384,25 @@ pub(crate) mod transactions { State::Uninitialized, Some(id), TransactionsResponseKind::Transactions(Transactions { items }), - ) => State::Transactions { - last_id: id, - transactions: [( - id, - items - .into_iter() - .map(TransactionVariant::try_from_dto) - .collect::>>() - .context("parsing transactions")?, - )] - .into(), - }, + ) => { + let mut deploy_account = HashMap::new(); + let mut other = HashMap::new(); + let (new_deploy_account, new_other) = try_from_dto(items)?; + if !deploy_account.is_empty() { + deploy_account.insert(id, new_deploy_account); + } + if !other.is_empty() { + other.insert(id, new_other); + } + + State::Transactions { + last_id: id, + transactions: InnerTransactions { + deploy_account, + other, + }, + } + } // The peer does not have anything we asked for (State::Uninitialized, _, TransactionsResponseKind::Fin(Fin { error })) => { State::Empty { error } @@ -348,16 +416,17 @@ pub(crate) mod transactions { Some(id), TransactionsResponseKind::Transactions(Transactions { items }), ) if last_id == id => { + let (new_deploy_account, new_other) = try_from_dto(items)?; transactions + .deploy_account .get_mut(&id) - .expect("transactions for this id is present") - .extend( - items - .into_iter() - .map(TransactionVariant::try_from_dto) - .collect::>>() - .context("parsing transactions")?, - ); + .expect("this id is present") + .extend(new_deploy_account); + transactions + .other + .get_mut(&id) + .expect("this id is present") + .extend(new_other); State::Transactions { last_id, @@ -391,17 +460,14 @@ pub(crate) mod transactions { anyhow::bail!("unexpected response"); } - transactions.insert( - id, - items - .into_iter() - .map(TransactionVariant::try_from_dto) - .collect::>>() - .context("parsing transactions")?, - ); + let (new_deploy_account, new_other) = try_from_dto(items)?; + + transactions.deploy_account.insert(id, new_deploy_account); + transactions.other.insert(id, new_other); State::Transactions { last_id: id, + transactions, } } @@ -410,10 +476,7 @@ pub(crate) mod transactions { } fn from_inner(inner: Self::Inner) -> Self::Out { - inner - .into_iter() - .map(|(k, v)| (BlockHash(k.hash.0), v)) - .collect() + inner.into() } impl_take_parsed_and_should_stop!(transactions); diff --git a/crates/p2p/src/client/types.rs b/crates/p2p/src/client/types.rs index 26b778bb48..1a7b4bff8d 100644 --- a/crates/p2p/src/client/types.rs +++ b/crates/p2p/src/client/types.rs @@ -194,7 +194,7 @@ impl TryFromDto for TransactionVariant { /// ## Important /// /// This conversion does not compute deployed contract address for deploy account transactions - /// ([`TransactionVariant::DeployAccountTransactionV0V1`] and [`TransactionVariant::DeployAccountTransactionV3`]), + /// ([`TransactionVariant::DeployAccountV0V1`] and [`TransactionVariant::DeployAccountV3`]), /// filling it with a zero address instead. The caller is responsible for performing the computation after the conversion succeeds. fn try_from_dto(dto: p2p_proto::transaction::Transaction) -> anyhow::Result where