Skip to content

Commit

Permalink
Merge of #5841
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Dec 12, 2022
2 parents 9a6a11c + 3cfde8c commit 6b5251d
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 65 deletions.
14 changes: 4 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,19 +235,13 @@ So Zebra's state should always be valid, unless your OS or disk hardware is corr

There are a few bugs in Zebra that we're still working on fixing:

- Zebra falsely estimates that it's close to the tip when the network connection goes down [#4649](https://github.com/ZcashFoundation/zebra/issues/4649)

- One of the consequences of this issue is that Zebra might add unwanted load
to other peers when the connection goes back up. This load will last only
for a short period of time because Zebra will quickly find out that it's
still not close to the tip.
- Zebra falsely estimates that it's close to the tip when the network connection goes down [#4649](https://github.com/ZcashFoundation/zebra/issues/4649).

- If Zebra fails downloading the Zcash parameters, use [the Zcash parameters download script](https://github.com/zcash/zcash/blob/master/zcutil/fetch-params.sh) instead. This script might be needed on macOS, even with Rust stable.
- No Windows support [#3801](https://github.com/ZcashFoundation/zebra/issues/3801)
- We used to test with Windows Server 2019, but not anymore; see issue for details

- Experimental Tor support is disabled until [`arti-client` upgrades to `x25519-dalek` 2.0.0 or later](https://github.com/ZcashFoundation/zebra/issues/5492)
- This happens due to a Rust dependency conflict, which can only be resolved by changing the dependencies of `x25519-dalek`
- No Windows support [#3801](https://github.com/ZcashFoundation/zebra/issues/3801). We used to test with Windows Server 2019, but not anymore; see the issue for details.

- Experimental Tor support is disabled until [`arti-client` upgrades to `x25519-dalek` 2.0.0 or later](https://github.com/ZcashFoundation/zebra/issues/5492). This happens due to a Rust dependency conflict, which can only be resolved by upgrading to a version of `x25519-dalek` with the dependency fix.

- Output of `help`, `--help` flag, and usage of invalid commands or options are inconsistent. Reports of these issues can be found [here](https://github.com/ZcashFoundation/zebra/issues/5502) and are planned to be fixed in the context of [upgrading Abscissa](https://github.com/ZcashFoundation/zebra/issues/5502).

Expand Down
9 changes: 9 additions & 0 deletions zebra-chain/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,19 @@ impl fmt::Display for Transaction {
let mut fmter = f.debug_struct("Transaction");

fmter.field("version", &self.version());

if let Some(network_upgrade) = self.network_upgrade() {
fmter.field("network_upgrade", &network_upgrade);
}

if let Some(lock_time) = self.lock_time() {
fmter.field("lock_time", &lock_time);
}

if let Some(expiry_height) = self.expiry_height() {
fmter.field("expiry_height", &expiry_height);
}

fmter.field("transparent_inputs", &self.inputs().len());
fmter.field("transparent_outputs", &self.outputs().len());
fmter.field("sprout_joinsplits", &self.joinsplit_count());
Expand Down
4 changes: 2 additions & 2 deletions zebra-chain/src/transaction/unmined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ pub struct UnminedTx {
impl fmt::Display for UnminedTx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UnminedTx")
.field("transaction", &self.transaction)
.field("transaction", &self.transaction.to_string())
.field("serialized_size", &self.size)
.field("conventional_fee", &self.conventional_fee)
.finish()
Expand Down Expand Up @@ -327,7 +327,7 @@ pub struct VerifiedUnminedTx {
impl fmt::Display for VerifiedUnminedTx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("VerifiedUnminedTx")
.field("transaction", &self.transaction)
.field("transaction", &self.transaction.to_string())
.field("miner_fee", &self.miner_fee)
.field("legacy_sigop_count", &self.legacy_sigop_count)
.field("unpaid_actions", &self.unpaid_actions)
Expand Down
12 changes: 11 additions & 1 deletion zebra-state/src/service/chain_tip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! * [LatestChainTip] for efficient access to the current best tip, and
//! * [ChainTipChange] to `await` specific changes to the chain tip.
use std::sync::Arc;
use std::{fmt, sync::Arc};

use chrono::{DateTime, Utc};
use tokio::sync::watch;
Expand Down Expand Up @@ -73,6 +73,16 @@ pub struct ChainTipBlock {
pub previous_block_hash: block::Hash,
}

impl fmt::Display for ChainTipBlock {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ChainTipBlock")
.field("height", &self.height)
.field("hash", &self.hash)
.field("transactions", &self.transactions.len())
.finish()
}
}

impl From<ContextuallyValidBlock> for ChainTipBlock {
fn from(contextually_valid: ContextuallyValidBlock) -> Self {
let ContextuallyValidBlock {
Expand Down
5 changes: 3 additions & 2 deletions zebra-test/src/mock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,8 @@ impl<Request, Response, Error> MockService<Request, Response, PanicAssertion, Er
}
}

/// A helper method to get the next request from the queue.
/// Returns the next request from the queue,
/// or panics if there are no requests after a short timeout.
///
/// Returns the next request in the internal queue or waits at most the max delay time
/// configured by [`MockServiceBuilder::with_max_request_delay`] for a new request to be
Expand Down Expand Up @@ -687,7 +688,7 @@ impl<Request, Response, Assertion, Error> MockService<Request, Response, Asserti
/// If too many requests are received and the queue fills up, the oldest requests are dropped
/// and ignored. This means that calling this may not receive the next request if the queue is
/// not dimensioned properly with the [`MockServiceBuilder::with_proxy_channel_size`] method.
async fn try_next_request(&mut self) -> Option<ResponseSender<Request, Response, Error>> {
pub async fn try_next_request(&mut self) -> Option<ResponseSender<Request, Response, Error>> {
loop {
match timeout(self.max_request_delay, self.receiver.recv()).await {
Ok(Ok(item)) => {
Expand Down
57 changes: 44 additions & 13 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use zebra_chain::{
};
use zebra_consensus::{error::TransactionError, transaction};
use zebra_network as zn;
use zebra_node_services::mempool::{Request, Response};
use zebra_node_services::mempool::{Gossip, Request, Response};
use zebra_state as zs;
use zebra_state::{ChainTipChange, TipAction};

Expand Down Expand Up @@ -107,19 +107,32 @@ impl Default for ActiveState {
}
}

impl Drop for ActiveState {
fn drop(&mut self) {
if let ActiveState::Enabled { tx_downloads, .. } = self {
tx_downloads.cancel_all();
}
}
}

impl ActiveState {
/// Returns the current state, leaving [`Self::Disabled`] in its place.
fn take(&mut self) -> Self {
std::mem::take(self)
}

/// Returns a list of requests that will retry every stored and pending transaction.
fn transaction_retry_requests(&self) -> Vec<Gossip> {
match self {
ActiveState::Disabled => Vec::new(),
ActiveState::Enabled {
storage,
tx_downloads,
} => {
let mut transactions = Vec::new();

let storage = storage.transactions().map(|tx| tx.clone().into());
transactions.extend(storage);

let pending = tx_downloads.transaction_requests().cloned();
transactions.extend(pending);

transactions
}
}
}
}

/// Mempool async management and query service.
Expand Down Expand Up @@ -259,8 +272,8 @@ impl Mempool {
"deactivating mempool: Zebra is syncing lots of blocks"
);

// This drops the previous ActiveState::Enabled,
// cancelling its download tasks.
// This drops the previous ActiveState::Enabled, cancelling its download tasks.
// We don't preserve the previous transactions, because we are syncing lots of blocks.
self.active_state = ActiveState::Disabled
}

Expand Down Expand Up @@ -319,17 +332,35 @@ impl Service<Request> for Mempool {
"resetting mempool: switched best chain, skipped blocks, or activated network upgrade"
);

let previous_state = self.active_state.take();
let tx_retries = previous_state.transaction_retry_requests();

// Use the same code for dropping and resetting the mempool,
// to avoid subtle bugs.

//
// Drop the current contents of the state,
// cancelling any pending download tasks,
// and dropping completed verification results.
std::mem::drop(self.active_state.take());
std::mem::drop(previous_state);

// Re-initialise an empty state.
self.update_state();

// Re-verify the transactions that were pending or valid at the previous tip.
// This saves us the time and data needed to re-download them.
if let ActiveState::Enabled { tx_downloads, .. } = &mut self.active_state {
info!(
transactions = tx_retries.len(),
"re-verifying mempool transactions after a chain fork"
);

for tx in tx_retries {
// This is just an efficiency optimisation, so we don't care if queueing
// transaction requests fails.
let _result = tx_downloads.download_if_needed_and_verify(tx);
}
}

return Poll::Ready(Ok(()));
}

Expand Down
33 changes: 24 additions & 9 deletions zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ pub(crate) const TRANSACTION_VERIFY_TIMEOUT: Duration = BLOCK_VERIFY_TIMEOUT;
/// Therefore, this attack can be carried out by a single malicious node.
pub const MAX_INBOUND_CONCURRENCY: usize = 10;

/// A marker struct for the oneshot channels which cancel a pending download and verify.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
struct CancelDownloadAndVerify;

/// Errors that can occur while downloading and verifying a transaction.
#[derive(Error, Debug)]
#[allow(dead_code)]
Expand All @@ -122,7 +126,7 @@ pub enum TransactionDownloadVerifyError {
#[derive(Debug)]
pub struct Downloads<ZN, ZV, ZS>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + 'static,
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
Expand All @@ -148,8 +152,8 @@ where
>,

/// A list of channels that can be used to cancel pending transaction download and
/// verify tasks.
cancel_handles: HashMap<UnminedTxId, oneshot::Sender<()>>,
/// verify tasks. Each channel also has the corresponding request.
cancel_handles: HashMap<UnminedTxId, (oneshot::Sender<CancelDownloadAndVerify>, Gossip)>,
}

impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
Expand Down Expand Up @@ -264,12 +268,14 @@ where
}

// This oneshot is used to signal cancellation to the download task.
let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
let (cancel_tx, mut cancel_rx) = oneshot::channel::<CancelDownloadAndVerify>();

let network = self.network.clone();
let verifier = self.verifier.clone();
let mut state = self.state.clone();

let gossiped_tx_req = gossiped_tx.clone();

let fut = async move {
// Don't download/verify if the transaction is already in the best chain.
Self::transaction_in_best_chain(&mut state, txid).await?;
Expand Down Expand Up @@ -378,7 +384,9 @@ where

self.pending.push(task);
assert!(
self.cancel_handles.insert(txid, cancel_tx).is_none(),
self.cancel_handles
.insert(txid, (cancel_tx, gossiped_tx_req))
.is_none(),
"transactions are only queued once"
);

Expand Down Expand Up @@ -411,7 +419,7 @@ where

for txid in removed_txids {
if let Some(handle) = self.cancel_handles.remove(&txid) {
let _ = handle.send(());
let _ = handle.0.send(CancelDownloadAndVerify);
}
}
}
Expand All @@ -425,7 +433,7 @@ where
// Since we already dropped the JoinHandles above, they should
// fail silently.
for (_hash, cancel) in self.cancel_handles.drain() {
let _ = cancel.send(());
let _ = cancel.0.send(CancelDownloadAndVerify);
}
assert!(self.pending.is_empty());
assert!(self.cancel_handles.is_empty());
Expand All @@ -442,6 +450,11 @@ where
self.pending.len()
}

/// Get a list of the currently pending transaction requests.
pub fn transaction_requests(&self) -> impl Iterator<Item = &Gossip> {
self.cancel_handles.iter().map(|(_tx_id, (_handle, tx))| tx)
}

/// Check if transaction is already in the best chain.
async fn transaction_in_best_chain(
state: &mut ZS,
Expand All @@ -467,14 +480,16 @@ where
#[pinned_drop]
impl<ZN, ZV, ZS> PinnedDrop for Downloads<ZN, ZV, ZS>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + 'static,
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
fn drop(self: Pin<&mut Self>) {
fn drop(mut self: Pin<&mut Self>) {
self.cancel_all();

metrics::gauge!("mempool.currently.queued.transactions", 0 as f64);
}
}
Loading

0 comments on commit 6b5251d

Please sign in to comment.