diff --git a/Cargo.lock b/Cargo.lock index eb7090ba0..54c714b71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4149,6 +4149,23 @@ dependencies = [ "zcash_primitives", ] +[[package]] +name = "zingo-netutils" +version = "0.1.0" +dependencies = [ + "http", + "http-body", + "hyper", + "hyper-rustls", + "prost", + "rustls-pemfile", + "tokio-rustls 0.23.4", + "tonic", + "tower", + "webpki-roots 0.21.1", + "zcash_client_backend", +] + [[package]] name = "zingo-status" version = "0.1.0" @@ -4178,6 +4195,7 @@ dependencies = [ "zcash_address", "zcash_client_backend", "zcash_primitives", + "zingo-netutils", "zingoconfig", "zingolib", ] @@ -4243,7 +4261,6 @@ dependencies = [ "ring 0.17.8", "ripemd160", "rust-embed", - "rustls-pemfile", "sapling-crypto", "secp256k1", "serde", @@ -4254,7 +4271,6 @@ dependencies = [ "subtle", "tempfile", "tokio", - "tokio-rustls 0.23.4", "tokio-stream", "tonic", "tonic-build", @@ -4262,7 +4278,6 @@ dependencies = [ "tower-http", "tracing", "tracing-subscriber", - "webpki-roots 0.21.1", "zcash_address", "zcash_client_backend", "zcash_encoding", @@ -4270,6 +4285,7 @@ dependencies = [ "zcash_primitives", "zcash_proofs", "zingo-memo", + "zingo-netutils", "zingo-status", "zingo-testvectors", "zingoconfig", diff --git a/Cargo.toml b/Cargo.toml index d0cfbe7d8..77a574f93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "zingolib", "zingoconfig", "zingo-testvectors", + "zingo-netutils", "zingo-memo", ] resolver = "2" @@ -41,6 +42,7 @@ tonic = {version = "0.10.0", features = ["tls", "tls-roots", "tls-webpki-roots"] prost = "0.12.0" tower = { version = "0.4" } hex = "0.4" +tokio-rustls = "0.23" [profile.release] debug = false diff --git a/darkside-tests/src/utils.rs b/darkside-tests/src/utils.rs index 483711d91..4778fa6e1 100644 --- a/darkside-tests/src/utils.rs +++ b/darkside-tests/src/utils.rs @@ -281,7 +281,7 @@ pub async fn update_tree_states_for_transaction( raw_tx: RawTransaction, height: u64, ) -> TreeState { - let trees = zingolib::grpc_connector::GrpcConnector::get_trees(server_id.clone(), height - 1) + let trees = zingolib::grpc_connector::get_trees(server_id.clone(), height - 1) .await .unwrap(); let mut sapling_tree: sapling_crypto::CommitmentTree = read_commitment_tree( diff --git a/integration-tests/tests/integrations.rs b/integration-tests/tests/integrations.rs index 6bb1566c0..0ab6a6a06 100644 --- a/integration-tests/tests/integrations.rs +++ b/integration-tests/tests/integrations.rs @@ -2587,7 +2587,7 @@ mod slow { .witness_tree_orchard .max_leaf_position(0) .unwrap(); - let server_trees = zingolib::grpc_connector::GrpcConnector::get_trees( + let server_trees = zingolib::grpc_connector::get_trees( recipient.get_server_uri(), recipient.wallet.last_synced_height().await, ) diff --git a/zingo-netutils/Cargo.toml b/zingo-netutils/Cargo.toml new file mode 100644 index 000000000..4848ae4f2 --- /dev/null +++ b/zingo-netutils/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "zingo-netutils" +version = "0.1.0" +authors = ["zingo@zingolabs.org"] +edition = "2021" + +[dependencies] +http.workspace = true +tokio-rustls.workspace = true +zcash_client_backend.workspace = true +tower.workspace = true +hyper-rustls.workspace = true +webpki-roots = "0.21.0" +hyper.workspace = true +http-body.workspace = true +tonic.workspace = true +prost.workspace = true +rustls-pemfile = "1.0.0" + +[features] +test = [] + + diff --git a/zingo-netutils/src/lib.rs b/zingo-netutils/src/lib.rs new file mode 100644 index 000000000..f4f2aed0f --- /dev/null +++ b/zingo-netutils/src/lib.rs @@ -0,0 +1,125 @@ +use std::sync::Arc; +use tower::ServiceExt; + +use http::Uri; +use http_body::combinators::UnsyncBoxBody; +use hyper::client::HttpConnector; +use tokio_rustls::rustls::{ClientConfig, RootCertStore}; +use tonic::Status; +use tower::util::BoxCloneService; +use zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient; + +type UnderlyingService = BoxCloneService< + http::Request>, + http::Response, + hyper::Error, +>; + +#[derive(Clone)] +pub struct GrpcConnector { + uri: http::Uri, +} + +impl GrpcConnector { + pub fn new(uri: http::Uri) -> Self { + Self { uri } + } + + pub fn uri(&self) -> &Uri { + &self.uri + } + + pub fn get_client( + &self, + ) -> impl std::future::Future< + Output = Result, Box>, + > { + let uri = Arc::new(self.uri.clone()); + async move { + let mut http_connector = HttpConnector::new(); + http_connector.enforce_http(false); + if uri.scheme_str() == Some("https") { + let mut roots = RootCertStore::empty(); + roots.add_server_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.0.iter().map( + |anchor_ref| { + tokio_rustls::rustls::OwnedTrustAnchor::from_subject_spki_name_constraints( + anchor_ref.subject, + anchor_ref.spki, + anchor_ref.name_constraints, + ) + }, + )); + + #[cfg(test)] + add_test_cert_to_roots(&mut roots); + + let tls = ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(roots) + .with_no_client_auth(); + let connector = tower::ServiceBuilder::new() + .layer_fn(move |s| { + let tls = tls.clone(); + + hyper_rustls::HttpsConnectorBuilder::new() + .with_tls_config(tls) + .https_or_http() + .enable_http2() + .wrap_connector(s) + }) + .service(http_connector); + let client = Box::new(hyper::Client::builder().build(connector)); + let uri = uri.clone(); + let svc = tower::ServiceBuilder::new() + //Here, we take all the pieces of our uri, and add in the path from the Requests's uri + .map_request(move |mut req: http::Request| { + let uri = Uri::builder() + .scheme(uri.scheme().unwrap().clone()) + .authority(uri.authority().unwrap().clone()) + //here. The Request's uri contains the path to the GRPC sever and + //the method being called + .path_and_query(req.uri().path_and_query().unwrap().clone()) + .build() + .unwrap(); + + *req.uri_mut() = uri; + req + }) + .service(client); + + Ok(CompactTxStreamerClient::new(svc.boxed_clone())) + } else { + let connector = tower::ServiceBuilder::new().service(http_connector); + let client = Box::new(hyper::Client::builder().http2_only(true).build(connector)); + let uri = uri.clone(); + let svc = tower::ServiceBuilder::new() + //Here, we take all the pieces of our uri, and add in the path from the Requests's uri + .map_request(move |mut req: http::Request| { + let uri = Uri::builder() + .scheme(uri.scheme().unwrap().clone()) + .authority(uri.authority().unwrap().clone()) + //here. The Request's uri contains the path to the GRPC sever and + //the method being called + .path_and_query(req.uri().path_and_query().unwrap().clone()) + .build() + .unwrap(); + + *req.uri_mut() = uri; + req + }) + .service(client); + + Ok(CompactTxStreamerClient::new(svc.boxed_clone())) + } + } + } +} + +#[cfg(test)] +fn add_test_cert_to_roots(roots: &mut RootCertStore) { + const TEST_PEMFILE_PATH: &str = "test-data/localhost.pem"; + let fd = std::fs::File::open(TEST_PEMFILE_PATH).unwrap(); + let mut buf = std::io::BufReader::new(&fd); + let certs = rustls_pemfile::certs(&mut buf).unwrap(); + roots.add_parsable_certificates(&certs); +} diff --git a/zingo-testutils/Cargo.toml b/zingo-testutils/Cargo.toml index 770757c45..b6e6c02e9 100644 --- a/zingo-testutils/Cargo.toml +++ b/zingo-testutils/Cargo.toml @@ -12,6 +12,7 @@ default = ["grpc-proxy"] [dependencies] zingoconfig = { path = "../zingoconfig" } zingolib = { path = "../zingolib", features = ["test"] } +zingo-netutils = { path = "../zingo-netutils", features = ["test"] } zcash_client_backend = { workspace = true } zcash_primitives = { workspace = true } diff --git a/zingo-testutils/src/grpc_proxy.rs b/zingo-testutils/src/grpc_proxy.rs index 88d16eb28..94c47b0e7 100644 --- a/zingo-testutils/src/grpc_proxy.rs +++ b/zingo-testutils/src/grpc_proxy.rs @@ -46,7 +46,7 @@ macro_rules! define_grpc_passthrough { } println!("Proxy passing through {rpc_name} call"); - ::zingolib::grpc_connector::GrpcConnector::new($self.lightwalletd_uri.clone()) + ::zingo_netutils::GrpcConnector::new($self.lightwalletd_uri.clone()) .get_client() .await .expect("Proxy server failed to create client") diff --git a/zingocli/Cargo.toml b/zingocli/Cargo.toml index 3a593b1e8..a9d5f065f 100644 --- a/zingocli/Cargo.toml +++ b/zingocli/Cargo.toml @@ -19,6 +19,6 @@ futures = "0.3.15" rustls-pemfile = "1.0.0" tokio = { version = "1.24.2", features = ["full"] } tokio-stream = "0.1.6" -tokio-rustls = "0.23.3" +tokio-rustls = { workspace = true } webpki-roots = "0.21.0" json = "0.12.4" diff --git a/zingolib/Cargo.toml b/zingolib/Cargo.toml index 2d8ffcc76..d34f9abde 100644 --- a/zingolib/Cargo.toml +++ b/zingolib/Cargo.toml @@ -20,6 +20,7 @@ zingoconfig = { path = "../zingoconfig" } zingo-memo = { path = "../zingo-memo" } zingo-status = { path = "../zingo-status" } zingo-testvectors = { path = "../zingo-testvectors", optional = true } +zingo-netutils = { path = "../zingo-netutils" } http-body = { workspace = true } hyper = { workspace = true } @@ -48,15 +49,12 @@ rand = "0.8.5" serde_json = "1.0.82" tokio = { version = "1.24.2", features = ["full"] } tokio-stream = "0.1.6" -tokio-rustls = "0.23.3" reqwest = { version = "0.11", features = ["json"] } -rustls-pemfile = "1.0.0" tower-http = { version = "0.2", features = ["add-extension"] } futures = { workspace = true } hex = { workspace = true } ring = "0.17.0" json = "0.12.4" -webpki-roots = "0.21.0" lazy_static = "1.4.0" secp256k1 = "=0.26.0" ripemd160 = "0.9.1" diff --git a/zingolib/src/blaze/block_management_reorg_detection.rs b/zingolib/src/blaze/block_management_reorg_detection.rs index 3e6b31732..029f158b6 100644 --- a/zingolib/src/blaze/block_management_reorg_detection.rs +++ b/zingolib/src/blaze/block_management_reorg_detection.rs @@ -1,12 +1,9 @@ use crate::wallet::traits::FromCommitment; -use crate::{ - grpc_connector::GrpcConnector, - wallet::{ - data::{BlockData, PoolNullifier}, - notes::ShieldedNoteInterface, - traits::DomainWalletExt, - transactions::TransactionMetadataSet, - }, +use crate::wallet::{ + data::{BlockData, PoolNullifier}, + notes::ShieldedNoteInterface, + traits::DomainWalletExt, + transactions::TransactionMetadataSet, }; use incrementalmerkletree::frontier::CommitmentTree; use incrementalmerkletree::{frontier, witness::IncrementalWitness, Hashable}; @@ -523,7 +520,7 @@ impl BlockManagementData { let tree = if prev_height < activation_height { frontier::CommitmentTree::<::Node, 32>::empty() } else { - let tree_state = GrpcConnector::get_trees(uri, prev_height).await?; + let tree_state = crate::grpc_connector::get_trees(uri, prev_height).await?; let tree = hex::decode(D::get_tree(&tree_state)).unwrap(); self.unverified_treestates.write().await.push(tree_state); read_commitment_tree(&tree[..]).map_err(|e| format!("{}", e))? diff --git a/zingolib/src/blaze/fetch_compact_blocks.rs b/zingolib/src/blaze/fetch_compact_blocks.rs index d6f41e423..b204dae0f 100644 --- a/zingolib/src/blaze/fetch_compact_blocks.rs +++ b/zingolib/src/blaze/fetch_compact_blocks.rs @@ -35,7 +35,7 @@ impl FetchCompactBlocks { debug!("Fetching blocks {}-{}", start, end); - grpc_client.get_block_range(start, end, senders).await?; + crate::grpc_connector::get_block_range(&grpc_client, start, end, senders).await?; } Ok(()) diff --git a/zingolib/src/grpc_connector.rs b/zingolib/src/grpc_connector.rs index a84347850..b37ef12f9 100644 --- a/zingolib/src/grpc_connector.rs +++ b/zingolib/src/grpc_connector.rs @@ -4,480 +4,362 @@ use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; -use http_body::combinators::UnsyncBoxBody; -use hyper::{client::HttpConnector, Uri}; use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::sync::oneshot; use tokio::task::JoinHandle; -use tokio_rustls::rustls::{ClientConfig, RootCertStore}; use tonic::Request; -use tonic::Status; -use tower::{util::BoxCloneService, ServiceExt}; use zcash_client_backend::proto::compact_formats::CompactBlock; -use zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient; use zcash_client_backend::proto::service::{ BlockId, BlockRange, ChainSpec, Empty, LightdInfo, RawTransaction, TransparentAddressBlockFilter, TreeState, TxFilter, }; use zcash_primitives::consensus::{BlockHeight, BranchId, Parameters}; use zcash_primitives::transaction::{Transaction, TxId}; +pub(crate) use zingo_netutils::GrpcConnector; + +pub async fn start_saplingtree_fetcher( + conn: &GrpcConnector, +) -> ( + tokio::task::JoinHandle<()>, + UnboundedSender<(u64, oneshot::Sender>)>, +) { + let (transmitter, mut receiver) = + unbounded_channel::<(u64, oneshot::Sender>)>(); + let uri = conn.uri().clone(); + + let h = tokio::spawn(async move { + let uri = uri.clone(); + while let Some((height, result_transmitter)) = receiver.recv().await { + result_transmitter + .send(get_trees(uri.clone(), height).await) + .unwrap() + } + }); -type UnderlyingService = BoxCloneService< - http::Request>, - http::Response, - hyper::Error, ->; - -#[derive(Clone)] -pub struct GrpcConnector { - uri: http::Uri, + (h, transmitter) } -impl GrpcConnector { - pub fn new(uri: http::Uri) -> Self { - Self { uri } - } - - pub fn get_client( - &self, - ) -> impl std::future::Future< - Output = Result, Box>, - > { - let uri = Arc::new(self.uri.clone()); - async move { - let mut http_connector = HttpConnector::new(); - http_connector.enforce_http(false); - if uri.scheme_str() == Some("https") { - let mut roots = RootCertStore::empty(); - roots.add_server_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.0.iter().map( - |anchor_ref| { - tokio_rustls::rustls::OwnedTrustAnchor::from_subject_spki_name_constraints( - anchor_ref.subject, - anchor_ref.spki, - anchor_ref.name_constraints, - ) - }, - )); - - #[cfg(test)] - add_test_cert_to_roots(&mut roots); - - let tls = ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(roots) - .with_no_client_auth(); - let connector = tower::ServiceBuilder::new() - .layer_fn(move |s| { - let tls = tls.clone(); - - hyper_rustls::HttpsConnectorBuilder::new() - .with_tls_config(tls) - .https_or_http() - .enable_http2() - .wrap_connector(s) - }) - .service(http_connector); - let client = Box::new(hyper::Client::builder().build(connector)); - let uri = uri.clone(); - let svc = tower::ServiceBuilder::new() - //Here, we take all the pieces of our uri, and add in the path from the Requests's uri - .map_request(move |mut req: http::Request| { - let uri = Uri::builder() - .scheme(uri.scheme().unwrap().clone()) - .authority(uri.authority().unwrap().clone()) - //here. The Request's uri contains the path to the GRPC sever and - //the method being called - .path_and_query(req.uri().path_and_query().unwrap().clone()) - .build() - .unwrap(); - - *req.uri_mut() = uri; - req - }) - .service(client); - - Ok(CompactTxStreamerClient::new(svc.boxed_clone())) - } else { - let connector = tower::ServiceBuilder::new().service(http_connector); - let client = Box::new(hyper::Client::builder().http2_only(true).build(connector)); - let uri = uri.clone(); - let svc = tower::ServiceBuilder::new() - //Here, we take all the pieces of our uri, and add in the path from the Requests's uri - .map_request(move |mut req: http::Request| { - let uri = Uri::builder() - .scheme(uri.scheme().unwrap().clone()) - .authority(uri.authority().unwrap().clone()) - //here. The Request's uri contains the path to the GRPC sever and - //the method being called - .path_and_query(req.uri().path_and_query().unwrap().clone()) - .build() - .unwrap(); - - *req.uri_mut() = uri; - req - }) - .service(client); - - Ok(CompactTxStreamerClient::new(svc.boxed_clone())) +pub async fn start_taddr_transaction_fetcher( + conn: &GrpcConnector, +) -> ( + JoinHandle<()>, + oneshot::Sender<( + (Vec, u64, u64), + oneshot::Sender>>>, + )>, +) { + let (transmitter, receiver) = oneshot::channel::<( + (Vec, u64, u64), + oneshot::Sender>>>, + )>(); + let uri = conn.uri().clone(); + + let h = tokio::spawn(async move { + let uri = uri.clone(); + if let Ok(((taddrs, start_height, end_height), result_transmitter)) = receiver.await { + let mut transaction_receivers = vec![]; + let mut transaction_receivers_workers = vec![]; + + // Create a stream for every t-addr + for taddr in taddrs { + let (transaction_s, transaction_receiver) = unbounded_channel(); + transaction_receivers.push(transaction_receiver); + transaction_receivers_workers.push(tokio::spawn(get_taddr_transactions( + uri.clone(), + taddr, + start_height, + end_height, + transaction_s, + ))); } + + // Dispatch a set of receivers + result_transmitter.send(transaction_receivers).unwrap(); + + // // Wait for all the t-addr transactions to be fetched from LightwalletD and sent to the h1 handle. + join_all(transaction_receivers_workers).await; } - } + }); + + (h, transmitter) +} - pub async fn start_saplingtree_fetcher( - &self, - ) -> ( - JoinHandle<()>, - UnboundedSender<(u64, oneshot::Sender>)>, - ) { - let (transmitter, mut receiver) = - unbounded_channel::<(u64, oneshot::Sender>)>(); - let uri = self.uri.clone(); - - let h = tokio::spawn(async move { +pub async fn start_full_transaction_fetcher( + conn: &GrpcConnector, + network: impl Parameters + Send + Copy + 'static, +) -> ( + JoinHandle<()>, + UnboundedSender<(TxId, oneshot::Sender>)>, +) { + let (transmitter, mut receiver) = + unbounded_channel::<(TxId, oneshot::Sender>)>(); + let uri = conn.uri().clone(); + + let h = tokio::spawn(async move { + let mut workers = FuturesUnordered::new(); + while let Some((transaction_id, result_transmitter)) = receiver.recv().await { let uri = uri.clone(); - while let Some((height, result_transmitter)) = receiver.recv().await { + workers.push(tokio::spawn(async move { result_transmitter - .send(Self::get_trees(uri.clone(), height).await) + .send(get_full_transaction(uri.clone(), &transaction_id, network).await) .unwrap() - } - }); - - (h, transmitter) - } + })); - pub async fn start_taddr_transaction_fetcher( - &self, - ) -> ( - JoinHandle<()>, - oneshot::Sender<( - (Vec, u64, u64), - oneshot::Sender>>>, - )>, - ) { - let (transmitter, receiver) = oneshot::channel::<( - (Vec, u64, u64), - oneshot::Sender>>>, - )>(); - let uri = self.uri.clone(); - - let h = tokio::spawn(async move { - let uri = uri.clone(); - if let Ok(((taddrs, start_height, end_height), result_transmitter)) = receiver.await { - let mut transaction_receivers = vec![]; - let mut transaction_receivers_workers = vec![]; - - // Create a stream for every t-addr - for taddr in taddrs { - let (transaction_s, transaction_receiver) = unbounded_channel(); - transaction_receivers.push(transaction_receiver); - transaction_receivers_workers.push(tokio::spawn(Self::get_taddr_transactions( - uri.clone(), - taddr, - start_height, - end_height, - transaction_s, - ))); + // Do only 16 API calls in parallel, otherwise it might overflow OS's limit of + // number of simultaneous connections + if workers.len() > 16 { + while let Some(_r) = workers.next().await { + // Do nothing } - - // Dispatch a set of receivers - result_transmitter.send(transaction_receivers).unwrap(); - - // // Wait for all the t-addr transactions to be fetched from LightwalletD and sent to the h1 handle. - join_all(transaction_receivers_workers).await; } - }); - - (h, transmitter) - } - - pub async fn start_full_transaction_fetcher( - &self, - network: impl Parameters + Send + Copy + 'static, - ) -> ( - JoinHandle<()>, - UnboundedSender<(TxId, oneshot::Sender>)>, - ) { - let (transmitter, mut receiver) = - unbounded_channel::<(TxId, oneshot::Sender>)>(); - let uri = self.uri.clone(); - - let h = tokio::spawn(async move { - let mut workers = FuturesUnordered::new(); - while let Some((transaction_id, result_transmitter)) = receiver.recv().await { - let uri = uri.clone(); - workers.push(tokio::spawn(async move { - result_transmitter - .send( - Self::get_full_transaction(uri.clone(), &transaction_id, network).await, - ) - .unwrap() - })); - - // Do only 16 API calls in parallel, otherwise it might overflow OS's limit of - // number of simultaneous connections - if workers.len() > 16 { - while let Some(_r) = workers.next().await { - // Do nothing - } - } - } - }); - - (h, transmitter) - } - - pub async fn get_block_range( - &self, - start_height: u64, - end_height: u64, - senders: &[UnboundedSender; 2], - ) -> Result<(), String> { - let mut client = self.get_client().await.map_err(|e| format!("{}", e))?; - - let bs = BlockId { - height: start_height, - hash: vec![], - }; - let be = BlockId { - height: end_height, - hash: vec![], - }; - - let request = Request::new(BlockRange { - start: Some(bs), - end: Some(be), - }); - - let mut response = client - .get_block_range(request) - .await - .map_err(|e| format!("{}", e))? - .into_inner(); - - while let Some(block) = response.message().await.map_err(|e| format!("{}", e))? { - senders[0] - .send(block.clone()) - .map_err(|e| format!("{}", e))?; - senders[1].send(block).map_err(|e| format!("{}", e))?; } + }); - Ok(()) - } + (h, transmitter) +} - async fn get_full_transaction( - uri: http::Uri, - transaction_id: &TxId, - network: impl Parameters, - ) -> Result { - let client = Arc::new(GrpcConnector::new(uri)); - let request = Request::new(TxFilter { - block: None, - index: 0, - hash: transaction_id.as_ref().to_vec(), - }); - - // log::info!("Full fetching {}", transaction_id); - - let mut client = client - .get_client() - .await - .map_err(|e| format!("Error getting client: {:?}", e))?; - - let response = client - .get_transaction(request) - .await - .map_err(|e| format!("{}", e))? - .into_inner(); - - Transaction::read( - &response.data[..], - BranchId::for_height(&network, BlockHeight::from_u32(response.height as u32)), - ) - .map_err(|e| format!("Error parsing Transaction: {}", e)) +pub async fn get_block_range( + conn: &GrpcConnector, + start_height: u64, + end_height: u64, + senders: &[UnboundedSender; 2], +) -> Result<(), String> { + let mut client = conn.get_client().await.map_err(|e| format!("{}", e))?; + + let bs = BlockId { + height: start_height, + hash: vec![], + }; + let be = BlockId { + height: end_height, + hash: vec![], + }; + + let request = Request::new(BlockRange { + start: Some(bs), + end: Some(be), + }); + + let mut response = client + .get_block_range(request) + .await + .map_err(|e| format!("{}", e))? + .into_inner(); + + while let Some(block) = response.message().await.map_err(|e| format!("{}", e))? { + senders[0] + .send(block.clone()) + .map_err(|e| format!("{}", e))?; + senders[1].send(block).map_err(|e| format!("{}", e))?; } - async fn get_taddr_transactions( - uri: http::Uri, - taddr: String, - start_height: u64, - end_height: u64, - transactions_sender: UnboundedSender>, - ) -> Result<(), String> { - let client = Arc::new(GrpcConnector::new(uri)); - - // Make sure start_height is smaller than end_height, because the API expects it like that - let (start_height, end_height) = if start_height < end_height { - (start_height, end_height) - } else { - (end_height, start_height) - }; - - let start = Some(BlockId { - height: start_height, - hash: vec![], - }); - let end = Some(BlockId { - height: end_height, - hash: vec![], - }); - - let args = TransparentAddressBlockFilter { - address: taddr, - range: Some(BlockRange { start, end }), - }; - let request = Request::new(args.clone()); - - let mut client = client - .get_client() - .await - .map_err(|e| format!("Error getting client: {:?}", e))?; - - let maybe_response = match client.get_taddress_txids(request).await { - Ok(r) => r, - Err(e) => { - if e.code() == tonic::Code::Unimplemented { - // Try the old, legacy API - let request = Request::new(args); - client - .get_taddress_txids(request) - .await - .map_err(|e| format!("{}", e))? - } else { - return Err(format!("{}", e)); - } - } - }; + Ok(()) +} - let mut response = maybe_response.into_inner(); +async fn get_full_transaction( + uri: http::Uri, + transaction_id: &TxId, + network: impl Parameters, +) -> Result { + let client = Arc::new(GrpcConnector::new(uri)); + let request = Request::new(TxFilter { + block: None, + index: 0, + hash: transaction_id.as_ref().to_vec(), + }); + + // log::info!("Full fetching {}", transaction_id); + + let mut client = client + .get_client() + .await + .map_err(|e| format!("Error getting client: {:?}", e))?; + + let response = client + .get_transaction(request) + .await + .map_err(|e| format!("{}", e))? + .into_inner(); + + Transaction::read( + &response.data[..], + BranchId::for_height(&network, BlockHeight::from_u32(response.height as u32)), + ) + .map_err(|e| format!("Error parsing Transaction: {}", e)) +} - while let Some(transaction) = response.message().await.map_err(|e| format!("{}", e))? { - transactions_sender.send(Ok(transaction)).unwrap(); +async fn get_taddr_transactions( + uri: http::Uri, + taddr: String, + start_height: u64, + end_height: u64, + transactions_sender: UnboundedSender>, +) -> Result<(), String> { + let client = Arc::new(GrpcConnector::new(uri)); + + // Make sure start_height is smaller than end_height, because the API expects it like that + let (start_height, end_height) = if start_height < end_height { + (start_height, end_height) + } else { + (end_height, start_height) + }; + + let start = Some(BlockId { + height: start_height, + hash: vec![], + }); + let end = Some(BlockId { + height: end_height, + hash: vec![], + }); + + let args = TransparentAddressBlockFilter { + address: taddr, + range: Some(BlockRange { start, end }), + }; + let request = Request::new(args.clone()); + + let mut client = client + .get_client() + .await + .map_err(|e| format!("Error getting client: {:?}", e))?; + + let maybe_response = match client.get_taddress_txids(request).await { + Ok(r) => r, + Err(e) => { + if e.code() == tonic::Code::Unimplemented { + // Try the old, legacy API + let request = Request::new(args); + client + .get_taddress_txids(request) + .await + .map_err(|e| format!("{}", e))? + } else { + return Err(format!("{}", e)); + } } + }; + + let mut response = maybe_response.into_inner(); - Ok(()) + while let Some(transaction) = response.message().await.map_err(|e| format!("{}", e))? { + transactions_sender.send(Ok(transaction)).unwrap(); } - pub async fn get_info(uri: http::Uri) -> Result { - let client = Arc::new(GrpcConnector::new(uri.clone())); + Ok(()) +} - let mut client = client - .get_client() - .await - .map_err(|e| format!("Error getting client: {:?}", e))?; +pub async fn get_info(uri: http::Uri) -> Result { + let client = Arc::new(GrpcConnector::new(uri.clone())); - let request = Request::new(Empty {}); + let mut client = client + .get_client() + .await + .map_err(|e| format!("Error getting client: {:?}", e))?; - let response = client - .get_lightd_info(request) - .await - .map_err(|e| format!("Error with get_lightd_info response at {uri}: {e:?}"))?; - Ok(response.into_inner()) - } + let request = Request::new(Empty {}); - pub async fn monitor_mempool( - uri: http::Uri, - mempool_transmitter: UnboundedSender, - ) -> Result<(), String> { - let client = Arc::new(GrpcConnector::new(uri)); - - let mut client = client - .get_client() - .await - .map_err(|e| format!("Error getting client: {:?}", e))?; - - let request = Request::new(Empty {}); - - let mut response = client - .get_mempool_stream(request) - .await - .map_err(|e| format!("{}", e))? - .into_inner(); - while let Some(r_transmitter) = response.message().await.map_err(|e| format!("{}", e))? { - mempool_transmitter - .send(r_transmitter) - .map_err(|e| format!("{}", e))?; - } + let response = client + .get_lightd_info(request) + .await + .map_err(|e| format!("Error with get_lightd_info response at {uri}: {e:?}"))?; + Ok(response.into_inner()) +} - Ok(()) +pub async fn monitor_mempool( + uri: http::Uri, + mempool_transmitter: UnboundedSender, +) -> Result<(), String> { + let client = Arc::new(GrpcConnector::new(uri)); + + let mut client = client + .get_client() + .await + .map_err(|e| format!("Error getting client: {:?}", e))?; + + let request = Request::new(Empty {}); + + let mut response = client + .get_mempool_stream(request) + .await + .map_err(|e| format!("{}", e))? + .into_inner(); + while let Some(r_transmitter) = response.message().await.map_err(|e| format!("{}", e))? { + mempool_transmitter + .send(r_transmitter) + .map_err(|e| format!("{}", e))?; } - pub async fn get_trees(uri: http::Uri, height: u64) -> Result { - let client = Arc::new(GrpcConnector::new(uri.clone())); - let mut client = client - .get_client() - .await - .map_err(|e| format!("Error getting client: {:?}", e))?; - - let b = BlockId { - height, - hash: vec![], - }; - let response = client - .get_tree_state(Request::new(b)) - .await - .map_err(|e| format!("Error with get_tree_state response at {uri}: {:?}", e))?; - - Ok(response.into_inner()) - } + Ok(()) +} - // get_latest_block GRPC call - pub async fn get_latest_block(uri: http::Uri) -> Result { - let client = Arc::new(GrpcConnector::new(uri.clone())); - let mut client = client - .get_client() - .await - .map_err(|e| format!("Error getting client: {:?}", e))?; +pub async fn get_trees(uri: http::Uri, height: u64) -> Result { + let client = Arc::new(GrpcConnector::new(uri.clone())); + let mut client = client + .get_client() + .await + .map_err(|e| format!("Error getting client: {:?}", e))?; + + let b = BlockId { + height, + hash: vec![], + }; + let response = client + .get_tree_state(Request::new(b)) + .await + .map_err(|e| format!("Error with get_tree_state response at {uri}: {:?}", e))?; + + Ok(response.into_inner()) +} - let request = Request::new(ChainSpec {}); +// get_latest_block GRPC call +pub async fn get_latest_block(uri: http::Uri) -> Result { + let client = Arc::new(GrpcConnector::new(uri.clone())); + let mut client = client + .get_client() + .await + .map_err(|e| format!("Error getting client: {:?}", e))?; - let response = client - .get_latest_block(request) - .await - .map_err(|e| format!("Error with get_latest_block response at {uri}: {:?}", e))?; + let request = Request::new(ChainSpec {}); - Ok(response.into_inner()) - } + let response = client + .get_latest_block(request) + .await + .map_err(|e| format!("Error with get_latest_block response at {uri}: {:?}", e))?; - pub async fn send_transaction( - uri: http::Uri, - transaction_bytes: Box<[u8]>, - ) -> Result { - let client = Arc::new(GrpcConnector::new(uri)); - let mut client = client - .get_client() - .await - .map_err(|e| format!("Error getting client: {:?}", e))?; - - let request = Request::new(RawTransaction { - data: transaction_bytes.to_vec(), - height: 0, - }); - - let response = client - .send_transaction(request) - .await - .map_err(|e| format!("Send Error: {}", e))?; - - let sendresponse = response.into_inner(); - if sendresponse.error_code == 0 { - let mut transaction_id = sendresponse.error_message; - if transaction_id.starts_with('\"') && transaction_id.ends_with('\"') { - transaction_id = transaction_id[1..transaction_id.len() - 1].to_string(); - } + Ok(response.into_inner()) +} - Ok(transaction_id) - } else { - Err(format!("Error: {:?}", sendresponse)) +pub async fn send_transaction( + uri: http::Uri, + transaction_bytes: Box<[u8]>, +) -> Result { + let client = Arc::new(GrpcConnector::new(uri)); + let mut client = client + .get_client() + .await + .map_err(|e| format!("Error getting client: {:?}", e))?; + + let request = Request::new(RawTransaction { + data: transaction_bytes.to_vec(), + height: 0, + }); + + let response = client + .send_transaction(request) + .await + .map_err(|e| format!("Send Error: {}", e))?; + + let sendresponse = response.into_inner(); + if sendresponse.error_code == 0 { + let mut transaction_id = sendresponse.error_message; + if transaction_id.starts_with('\"') && transaction_id.ends_with('\"') { + transaction_id = transaction_id[1..transaction_id.len() - 1].to_string(); } - } -} -#[cfg(test)] -fn add_test_cert_to_roots(roots: &mut RootCertStore) { - const TEST_PEMFILE_PATH: &str = "test-data/localhost.pem"; - let fd = std::fs::File::open(TEST_PEMFILE_PATH).unwrap(); - let mut buf = std::io::BufReader::new(&fd); - let certs = rustls_pemfile::certs(&mut buf).unwrap(); - roots.add_parsable_certificates(&certs); + Ok(transaction_id) + } else { + Err(format!("Error: {:?}", sendresponse)) + } } diff --git a/zingolib/src/lib.rs b/zingolib/src/lib.rs index 510f2a090..8c651cf8b 100644 --- a/zingolib/src/lib.rs +++ b/zingolib/src/lib.rs @@ -25,7 +25,7 @@ pub fn get_latest_block_height(lightwalletd_uri: http::Uri) -> std::io::Result String { - match GrpcConnector::get_info(self.get_server_uri()).await { + match crate::grpc_connector::get_info(self.get_server_uri()).await { Ok(i) => { let o = object! { "version" => i.version, @@ -902,7 +902,10 @@ impl LightClient { receivers, transaction_submission_height, |transaction_bytes| { - GrpcConnector::send_transaction(self.get_server_uri(), transaction_bytes) + crate::grpc_connector::send_transaction( + self.get_server_uri(), + transaction_bytes, + ) }, ) .await @@ -974,7 +977,10 @@ impl LightClient { receiver, transaction_submission_height, |transaction_bytes| { - GrpcConnector::send_transaction(self.get_server_uri(), transaction_bytes) + crate::grpc_connector::send_transaction( + self.get_server_uri(), + transaction_bytes, + ) }, ) .await @@ -1100,7 +1106,7 @@ impl LightClient { "Getting sapling tree from LightwalletD at height {}", height ); - match GrpcConnector::get_trees(self.config.get_lightwalletd_uri(), height).await { + match crate::grpc_connector::get_trees(self.config.get_lightwalletd_uri(), height).await { Ok(tree_state) => { let hash = tree_state.hash.clone(); let tree = tree_state.sapling_tree.clone(); @@ -1123,7 +1129,7 @@ impl LightClient { async fn get_submission_height(&self) -> Result { Ok(BlockHeight::from_u32( - GrpcConnector::get_latest_block(self.config.get_lightwalletd_uri()) + crate::grpc_connector::get_latest_block(self.config.get_lightwalletd_uri()) .await? .height as u32, ) + 1) @@ -1338,7 +1344,7 @@ impl LightClient { let h2 = tokio::spawn(async move { loop { //debug!("Monitoring mempool"); - let r = GrpcConnector::monitor_mempool( + let r = crate::grpc_connector::monitor_mempool( uri.clone(), mempool_transmitter.clone(), ) @@ -1382,16 +1388,13 @@ impl LightClient { if self.wallet.has_any_empty_commitment_trees().await && last_synced_height >= self.config.sapling_activation_height() { - let trees = crate::grpc_connector::GrpcConnector::get_trees( - self.get_server_uri(), - last_synced_height, - ) - .await?; + let trees = + crate::grpc_connector::get_trees(self.get_server_uri(), last_synced_height).await?; self.wallet.initiate_witness_trees(trees).await; }; let latest_blockid = - GrpcConnector::get_latest_block(self.config.get_lightwalletd_uri()).await?; + crate::grpc_connector::get_latest_block(self.config.get_lightwalletd_uri()).await?; // Block hashes are reversed when stored in BlockDatas, so we reverse here to match let latest_blockid = crate::wallet::data::BlockData::new_with(latest_blockid.height, &latest_blockid.hash); @@ -1539,12 +1542,14 @@ impl LightClient { // Full Tx GRPC fetcher let (full_transaction_fetcher_handle, full_transaction_fetcher_transmitter) = - grpc_connector - .start_full_transaction_fetcher(self.config.chain) - .await; + crate::grpc_connector::start_full_transaction_fetcher( + &grpc_connector, + self.config.chain, + ) + .await; // Transparent Transactions Fetcher let (taddr_fetcher_handle, taddr_fetcher_transmitter) = - grpc_connector.start_taddr_transaction_fetcher().await; + crate::grpc_connector::start_taddr_transaction_fetcher(&grpc_connector).await; // Local state necessary for a transaction fetch let transaction_context = TransactionContext::new( diff --git a/zingolib/src/wallet.rs b/zingolib/src/wallet.rs index 1cfbad2a2..ba3c74779 100644 --- a/zingolib/src/wallet.rs +++ b/zingolib/src/wallet.rs @@ -28,6 +28,7 @@ use std::{ time::SystemTime, }; use tokio::sync::RwLock; +use zcash_primitives::zip339::Mnemonic; use zcash_client_backend::proto::service::TreeState; use zcash_encoding::{Optional, Vector};