Skip to content

Commit

Permalink
Send crawled transaction IDs to downloader (#2801)
Browse files Browse the repository at this point in the history
* Rename type parameter to be more explicit

Replace the single letter with a proper name.

* Remove imports for `Request` and `Response`

The type names will conflict with the ones for the mempool service.

* Attach `Mempool` service to the `Crawler`

Add a field to the `Crawler` type to store a way to access the `Mempool`
service.

* Forward crawled transactions to downloader

The crawled transactions are now sent to the transaction downloader and
verifier, to be included in the mempool.

* Derive `Eq` and `PartialEq` for `mempool::Request`

Make it simpler to use the `MockService::expect_request` method.

* Test if crawled transactions are downloaded

Create some dummy crawled transactions, and let the crawler discover
them. Then check if they are forwarded to the mempool to be downloaded
and verified.

* Don't send empty transaction ID list to downloader

Ignore response from peers that don't provide any crawled transactions.

* Log errors when forwarding crawled transaction IDs

Calling the Mempool service should not fail, so if an error happens it
should be visible. However, errors when downloading individual
transactions can happen from time to time, so there's no need for them
to be very visible.

* Document existing `mempool::Crawler` test

Provide some depth as to what the test expect from the crawler's
behavior.

* Refactor to create `setup_crawler` helper function

Make it easier to reuse the common test setup code.

* Simplify code to expect requests

Now that `zebra_network::Request` implement `Eq`, the call can be
simplified into `expect_request`.

* Refactor to create `respond_with_transaction_ids`

A helper function that checks for a network crawl request and responds
with the given list of crawled transaction IDs.

* Refactor to create `crawler_iterator` helper

A function to intercept and respond to the fanned-out requests sent
during a single crawl iteration.

* Refactor to create `respond_to_queue_request`

Reduce the repeated code necessary to intercept and reply to a request
for queuing transactions to be downloaded.

* Add `respond_to_queue_request_with_error` helper

Intercepts a mempool request to queue transactions to be downloaded, and
responds with an error, simulating an internal problem in the mempool
service implementation.

* Derive `Arbitrary` for `NetworkUpgrade`

This is required for deriving `Arbitrary` for some error types.

* Derive `Arbitrary` for `TransactionError`

Allow random transaction errors to be generated for property tests.

* Derive `Arbitrary` for `MempoolError`

Allow random Mempool errors to be generated for property tests.

* Test if errors don't stop the mempool crawler

The crawler should be robust enough to continue operating even if the
mempool service fails to download transactions or even fails to handle
requests to enqueue transactions.

* Reduce the log level for download errors

They should happen regularly, so there's no need to have them with a
high visibility level.

Co-authored-by: teor <[email protected]>

* Stop crawler if service stops

If `Mempool::poll_ready` returns an error, it's because the mempool
service has stopped and can't handle any requests, so the crawler should
stop as well.

Co-authored-by: teor <[email protected]>
Co-authored-by: Conrado Gouvea <[email protected]>
  • Loading branch information
3 people authored Oct 5, 2021
1 parent 8001414 commit 5d9893c
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 32 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions zebra-chain/src/parameters/network_upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ use std::ops::Bound::*;

use chrono::{DateTime, Duration, Utc};

#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;

/// A Zcash network upgrade.
///
/// Network upgrades can change the Zcash network protocol or consensus rules in
/// incompatible ways.
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)]
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
pub enum NetworkUpgrade {
/// The Zcash protocol for a Genesis block.
///
Expand Down
9 changes: 9 additions & 0 deletions zebra-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ authors = ["Zcash Foundation <[email protected]>"]
license = "MIT OR Apache-2.0"
edition = "2018"

[features]
default = []
proptest-impl = ["proptest", "proptest-derive", "zebra-chain/proptest-impl"]

[dependencies]
blake2b_simd = "0.5.11"
bellman = "0.10.0"
Expand Down Expand Up @@ -33,8 +37,13 @@ zebra-state = { path = "../zebra-state" }
zebra-script = { path = "../zebra-script" }
wagyu-zcash-parameters = "0.2.0"

proptest = { version = "0.10", optional = true }
proptest-derive = { version = "0.3.0", optional = true }

[dev-dependencies]
color-eyre = "0.5.11"
proptest = "0.10"
proptest-derive = "0.3.0"
rand07 = { package = "rand", version = "0.7" }
spandoc = "0.2"
tokio = { version = "0.3.6", features = ["full"] }
Expand Down
9 changes: 9 additions & 0 deletions zebra-consensus/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use thiserror::Error;

use crate::BoxError;

#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;

#[derive(Error, Copy, Clone, Debug, PartialEq)]
pub enum SubsidyError {
#[error("no coinbase transaction in block")]
Expand All @@ -19,6 +22,7 @@ pub enum SubsidyError {
}

#[derive(Error, Clone, Debug, PartialEq)]
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
pub enum TransactionError {
#[error("first transaction must be coinbase")]
CoinbasePosition,
Expand All @@ -45,6 +49,7 @@ pub enum TransactionError {
CoinbaseInMempool,

#[error("coinbase transaction failed subsidy validation")]
#[cfg_attr(any(test, feature = "proptest-impl"), proptest(skip))]
Subsidy(#[from] SubsidyError),

#[error("transaction version number MUST be >= 4")]
Expand All @@ -63,6 +68,7 @@ pub enum TransactionError {
BadBalance,

#[error("could not verify a transparent script")]
#[cfg_attr(any(test, feature = "proptest-impl"), proptest(skip))]
Script(#[from] zebra_script::Error),

#[error("spend description cv and rk MUST NOT be of small order")]
Expand All @@ -76,12 +82,15 @@ pub enum TransactionError {
#[error(
"Sprout joinSplitSig MUST represent a valid signature under joinSplitPubKey of dataToBeSigned"
)]
#[cfg_attr(any(test, feature = "proptest-impl"), proptest(skip))]
Ed25519(#[from] zebra_chain::primitives::ed25519::Error),

#[error("Sapling bindingSig MUST represent a valid signature under the transaction binding validating key bvk of SigHash")]
#[cfg_attr(any(test, feature = "proptest-impl"), proptest(skip))]
RedJubjub(zebra_chain::primitives::redjubjub::Error),

#[error("Orchard bindingSig MUST represent a valid signature under the transaction binding validating key bvk of SigHash")]
#[cfg_attr(any(test, feature = "proptest-impl"), proptest(skip))]
RedPallas(zebra_chain::primitives::redpallas::Error),

// temporary error type until #1186 is fixed
Expand Down
1 change: 1 addition & 0 deletions zebrad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ proptest = "0.10"
proptest-derive = "0.3"

zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] }
zebra-consensus = { path = "../zebra-consensus/", features = ["proptest-impl"] }
zebra-state = { path = "../zebra-state", features = ["proptest-impl"] }
zebra-test = { path = "../zebra-test" }

Expand Down
4 changes: 2 additions & 2 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ impl StartCmd {
let mempool = ServiceBuilder::new().buffer(20).service(mempool_service);

setup_tx
.send((peer_set.clone(), address_book, mempool))
.send((peer_set.clone(), address_book, mempool.clone()))
.map_err(|_| eyre!("could not send setup data to inbound service"))?;

select! {
result = syncer.sync().fuse() => result,
_ = mempool::Crawler::spawn(peer_set, sync_status).fuse() => {
_ = mempool::Crawler::spawn(peer_set, mempool, sync_status).fuse() => {
unreachable!("The mempool crawler only stops if it panics");
}
}
Expand Down
2 changes: 1 addition & 1 deletion zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type TxVerifier = Buffer<
>;
type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;

#[derive(Debug)]
#[derive(Debug, Eq, PartialEq)]
#[allow(dead_code)]
pub enum Request {
TransactionIds,
Expand Down
72 changes: 58 additions & 14 deletions zebrad/src/components/mempool/crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ use futures::{stream::FuturesUnordered, StreamExt};
use tokio::{task::JoinHandle, time::sleep};
use tower::{timeout::Timeout, BoxError, Service, ServiceExt};

use zebra_network::{Request, Response};
use zebra_network as zn;

use super::super::sync::SyncStatus;
use super::{
super::{mempool, sync::SyncStatus},
downloads::Gossip,
};

#[cfg(test)]
mod tests;
Expand All @@ -31,20 +34,30 @@ const RATE_LIMIT_DELAY: Duration = Duration::from_secs(75);
const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);

/// The mempool transaction crawler.
pub struct Crawler<S> {
peer_set: Timeout<S>,
pub struct Crawler<PeerSet, Mempool> {
peer_set: Timeout<PeerSet>,
mempool: Mempool,
status: SyncStatus,
}

impl<S> Crawler<S>
impl<PeerSet, Mempool> Crawler<PeerSet, Mempool>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
PeerSet:
Service<zn::Request, Response = zn::Response, Error = BoxError> + Clone + Send + 'static,
PeerSet::Future: Send,
Mempool:
Service<mempool::Request, Response = mempool::Response, Error = BoxError> + Send + 'static,
Mempool::Future: Send,
{
/// Spawn an asynchronous task to run the mempool crawler.
pub fn spawn(peer_set: S, status: SyncStatus) -> JoinHandle<Result<(), BoxError>> {
pub fn spawn(
peer_set: PeerSet,
mempool: Mempool,
status: SyncStatus,
) -> JoinHandle<Result<(), BoxError>> {
let crawler = Crawler {
peer_set: Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT),
mempool,
status,
};

Expand Down Expand Up @@ -79,13 +92,13 @@ where
// end the task on permanent peer set errors
let peer_set = peer_set.ready_and().await?;

requests.push(peer_set.call(Request::MempoolTransactionIds));
requests.push(peer_set.call(zn::Request::MempoolTransactionIds));
}

while let Some(result) = requests.next().await {
// log individual response errors
match result {
Ok(response) => self.handle_response(response).await,
Ok(response) => self.handle_response(response).await?,
// TODO: Reduce the log level of the errors (#2655).
Err(error) => info!("Failed to crawl peer for mempool transactions: {}", error),
}
Expand All @@ -95,9 +108,9 @@ where
}

/// Handle a peer's response to the crawler's request for transactions.
async fn handle_response(&mut self, response: Response) {
let transaction_ids = match response {
Response::TransactionIds(ids) => ids,
async fn handle_response(&mut self, response: zn::Response) -> Result<(), BoxError> {
let transaction_ids: Vec<_> = match response {
zn::Response::TransactionIds(ids) => ids.into_iter().map(Gossip::Id).collect(),
_ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"),
};

Expand All @@ -106,6 +119,37 @@ where
transaction_ids.len()
);

// TODO: Send transaction IDs to the download and verify stream (#2650)
if !transaction_ids.is_empty() {
self.queue_transactions(transaction_ids).await?;
}

Ok(())
}

/// Forward the crawled transactions IDs to the mempool transaction downloader.
async fn queue_transactions(&mut self, transaction_ids: Vec<Gossip>) -> Result<(), BoxError> {
let call_result = self
.mempool
.ready_and()
.await?
.call(mempool::Request::Queue(transaction_ids))
.await;

let queue_errors = match call_result {
Ok(mempool::Response::Queued(queue_results)) => {
queue_results.into_iter().filter_map(Result::err)
}
Ok(_) => unreachable!("Mempool did not respond with queue results to mempool crawler"),
Err(call_error) => {
debug!("Ignoring unexpected peer behavior: {}", call_error);
return Ok(());
}
};

for error in queue_errors {
debug!("Failed to download a crawled transaction: {}", error);
}

Ok(())
}
}
Loading

0 comments on commit 5d9893c

Please sign in to comment.