Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify inbound PushTransactions #2727

Merged
merged 5 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 21 additions & 35 deletions grafana/transaction_verification.json
Original file line number Diff line number Diff line change
@@ -1,34 +1,4 @@
{
"__inputs": [
{
"name": "DS_PROMETHEUS-ZEBRA",
"label": "Prometheus-Zebra",
"description": "",
"type": "datasource",
"pluginId": "prometheus",
"pluginName": "Prometheus"
}
],
"__requires": [
{
"type": "grafana",
"id": "grafana",
"name": "Grafana",
"version": "8.1.2"
},
{
"type": "panel",
"id": "graph",
"name": "Graph (old)",
"version": ""
},
{
"type": "datasource",
"id": "prometheus",
"name": "Prometheus",
"version": "1.0.0"
}
],
"annotations": {
"list": [
{
Expand All @@ -51,8 +21,8 @@
"editable": true,
"gnetId": null,
"graphTooltip": 0,
"id": null,
"iteration": 1630092146360,
"id": 5,
"iteration": 1630611911135,
"links": [],
"panels": [
{
Expand Down Expand Up @@ -117,6 +87,14 @@
"interval": "",
"legendFormat": "gossip_queued_transaction_count",
"refId": "E"
},
{
"exemplar": true,
"expr": "rate(gossip_pushed_transaction_count{job=\"$job\"}[1m]) * 60",
"hide": false,
"interval": "",
"legendFormat": "gossip_pushed_transaction_count per min",
"refId": "A"
}
],
"thresholds": [],
Expand Down Expand Up @@ -169,8 +147,16 @@
"list": [
{
"allValue": null,
"current": {},
"datasource": "${DS_PROMETHEUS-ZEBRA}",
"current": {
"selected": true,
"text": [
"All"
],
"value": [
"$__all"
]
},
"datasource": "Prometheus-Zebra",
"definition": "label_values(zcash_chain_verified_block_height, job)",
"description": null,
"error": null,
Expand Down Expand Up @@ -203,5 +189,5 @@
"timezone": "",
"title": "transaction verification",
"uid": "oBEHvS4nz",
"version": 2
"version": 4
}
16 changes: 11 additions & 5 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,18 +352,24 @@ impl Service<zn::Request> for Inbound {
})
.boxed()
}
zn::Request::PushTransaction(_transaction) => {
debug!("ignoring unimplemented request");
// TODO: send to Tx Download & Verify Stream
// https://github.com/ZcashFoundation/zebra/issues/2692
zn::Request::PushTransaction(transaction) => {
if let Setup::Initialized { tx_downloads, .. } = &mut self.network_setup {
// TODO: check if we're close to the tip before proceeding?
// what do we do if it's not?
mpguerra marked this conversation as resolved.
Show resolved Hide resolved
dconnolly marked this conversation as resolved.
Show resolved Hide resolved
tx_downloads.download_if_needed_and_verify(transaction.into());
} else {
info!(
"ignoring `AdvertiseTransactionIds` request from remote peer during network setup"
);
}
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::AdvertiseTransactionIds(transactions) => {
if let Setup::Initialized { tx_downloads, .. } = &mut self.network_setup {
// TODO: check if we're close to the tip before proceeding?
// what do we do if it's not?
for txid in transactions {
tx_downloads.download_and_verify(txid);
tx_downloads.download_if_needed_and_verify(txid.into());
}
} else {
info!(
Expand Down
81 changes: 61 additions & 20 deletions zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::{sync::oneshot, task::JoinHandle};
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;

use zebra_chain::transaction::UnminedTxId;
use zebra_chain::transaction::{UnminedTx, UnminedTxId};
use zebra_consensus::transaction as tx;
use zebra_network as zn;
use zebra_state as zs;
Expand Down Expand Up @@ -83,6 +83,34 @@ pub enum DownloadAction {
FullQueue,
}

/// A gossiped transaction, which can be the transaction itself or just its ID.
pub enum GossipedTx {
Id(UnminedTxId),
Tx(UnminedTx),
}

impl GossipedTx {
/// Return the [`UnminedTxId`] of a gossiped transaction.
fn id(&self) -> UnminedTxId {
match self {
GossipedTx::Id(txid) => *txid,
GossipedTx::Tx(tx) => tx.id,
}
}
}

impl From<UnminedTxId> for GossipedTx {
fn from(txid: UnminedTxId) -> Self {
GossipedTx::Id(txid)
}
}

impl From<UnminedTx> for GossipedTx {
fn from(tx: UnminedTx) -> Self {
GossipedTx::Tx(tx)
}
}

/// Represents a [`Stream`] of download and verification tasks.
#[pin_project]
#[derive(Debug)]
Expand Down Expand Up @@ -194,11 +222,13 @@ where
}
}

/// Queue a transaction for download and verification.
/// Queue a transaction for download (if needed) and verification.
///
/// Returns the action taken in response to the queue request.
#[instrument(skip(self, txid), fields(txid = %txid))]
pub fn download_and_verify(&mut self, txid: UnminedTxId) -> DownloadAction {
#[instrument(skip(self, gossiped_tx), fields(txid = %gossiped_tx.id()))]
pub fn download_if_needed_and_verify(&mut self, gossiped_tx: GossipedTx) -> DownloadAction {
let txid = gossiped_tx.id();

if self.cancel_handles.contains_key(&txid) {
tracing::debug!(
?txid,
Expand Down Expand Up @@ -228,7 +258,7 @@ where
let mut mempool = self.mempool.clone();

let fut = async move {
Self::should_download(&mut state, &mut mempool, txid).await?;
Self::should_download_or_verify(&mut state, &mut mempool, txid).await?;
dconnolly marked this conversation as resolved.
Show resolved Hide resolved

let height = match state.oneshot(zs::Request::Tip).await {
Ok(zs::Response::Tip(None)) => Err("no block at the tip".into()),
Expand All @@ -238,19 +268,29 @@ where
}?;
let height = (height + 1).ok_or_else(|| eyre!("no next height"))?;

let tx = if let zn::Response::Transactions(txs) = network
.oneshot(zn::Request::TransactionsById(
std::iter::once(txid).collect(),
))
.await?
{
txs.into_iter()
.next()
.expect("successful response has the transaction in it")
} else {
unreachable!("wrong response to transaction request");
let tx = match gossiped_tx {
GossipedTx::Id(txid) => {
let tx = if let zn::Response::Transactions(txs) = network
.oneshot(zn::Request::TransactionsById(
std::iter::once(txid).collect(),
))
.await?
{
txs.into_iter()
.next()
.expect("successful response has the transaction in it")
} else {
unreachable!("wrong response to transaction request");
};
metrics::counter!("gossip.downloaded.transaction.count", 1);

tx
}
GossipedTx::Tx(tx) => {
metrics::counter!("gossip.pushed.transaction.count", 1);
tx
}
};
metrics::counter!("gossip.downloaded.transaction.count", 1);

let result = verifier
.oneshot(tx::Request::Mempool {
Expand Down Expand Up @@ -302,11 +342,12 @@ where
DownloadAction::AddedToQueue
}

/// Check if transaction should be downloaded and verified.
/// Check if transaction should be downloaded and/or verified.
///
/// If it is already in the mempool (or in its rejected list)
/// or in state, then it shouldn't be downloaded (and an error is returned).
async fn should_download(
/// or in state, then it shouldn't be downloaded/verified
/// (and an error is returned).
async fn should_download_or_verify(
state: &mut ZS,
mempool: &mut ZM,
txid: UnminedTxId,
Expand Down